diff --git a/apis/apps/v1/types.go b/apis/apps/v1/types.go index 6f8813f6568..e18dbc50ba4 100644 --- a/apis/apps/v1/types.go +++ b/apis/apps/v1/types.go @@ -252,10 +252,42 @@ type ComponentNetwork struct { // +optional DNSPolicy *corev1.DNSPolicy `json:"dnsPolicy,omitempty"` - // Specifies the DNS parameters of a pod. + // Specifies the DNS parameters of the pod. // // +optional DNSConfig *corev1.PodDNSConfig `json:"dnsConfig,omitempty"` + + // HostPorts specifies the mapping of container ports to host ports. + // The behavior varies based on the HostNetwork setting: + // + // 1. When HostNetwork is enabled: + // - If this field is empty: All ports are automatically allocated by the host-port manager. + // - If this field is specified: + // a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + // b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + // You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + // + // 2. When HostNetwork is disabled: + // It allows optional mapping for container ports to host ports. + // - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + // - Any specified container ports not present in the runtime definition will be ignored. + // + // +optional + HostPorts []HostPort `json:"hostPorts,omitempty"` +} + +type HostPort struct { + // The name of the container port. + // + // +kubebuilder:validation:Required + Name string `json:"name"` + + // The port number of the host port. + // + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:validation:Required + Port int32 `json:"port"` } type Service struct { diff --git a/apis/apps/v1/zz_generated.deepcopy.go b/apis/apps/v1/zz_generated.deepcopy.go index 54db5be0e7c..b76820cde11 100644 --- a/apis/apps/v1/zz_generated.deepcopy.go +++ b/apis/apps/v1/zz_generated.deepcopy.go @@ -1441,6 +1441,11 @@ func (in *ComponentNetwork) DeepCopyInto(out *ComponentNetwork) { *out = new(corev1.PodDNSConfig) (*in).DeepCopyInto(*out) } + if in.HostPorts != nil { + in, out := &in.HostPorts, &out.HostPorts + *out = make([]HostPort, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentNetwork. @@ -2283,6 +2288,21 @@ func (in *HostNetworkVars) DeepCopy() *HostNetworkVars { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HostPort) DeepCopyInto(out *HostPort) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HostPort. +func (in *HostPort) DeepCopy() *HostPort { + if in == nil { + return nil + } + out := new(HostPort) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InstanceTemplate) DeepCopyInto(out *InstanceTemplate) { *out = *in diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 5f9e4f8dc54..7b5f1fda5a7 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -417,7 +417,7 @@ func main() { client = multiClusterMgr.GetClient() } - if err := intctrlutil.InitHostPortManager(mgr.GetClient()); err != nil { + if err := intctrlutil.InitDefaultHostPortManager(mgr.GetClient()); err != nil { setupLog.Error(err, "unable to init port manager") os.Exit(1) } diff --git a/config/crd/bases/apps.kubeblocks.io_clusters.yaml b/config/crd/bases/apps.kubeblocks.io_clusters.yaml index 24360af49a8..b4a4448c603 100644 --- a/config/crd/bases/apps.kubeblocks.io_clusters.yaml +++ b/config/crd/bases/apps.kubeblocks.io_clusters.yaml @@ -2709,7 +2709,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -2774,6 +2774,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- @@ -13959,7 +13993,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -14024,6 +14058,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- diff --git a/config/crd/bases/apps.kubeblocks.io_components.yaml b/config/crd/bases/apps.kubeblocks.io_components.yaml index 7ed168a4bad..1ba48b17bcb 100644 --- a/config/crd/bases/apps.kubeblocks.io_components.yaml +++ b/config/crd/bases/apps.kubeblocks.io_components.yaml @@ -2488,7 +2488,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -2553,6 +2553,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- diff --git a/controllers/apps/cluster/suite_test.go b/controllers/apps/cluster/suite_test.go index 07966b44d83..0fa38de4d5e 100644 --- a/controllers/apps/cluster/suite_test.go +++ b/controllers/apps/cluster/suite_test.go @@ -162,7 +162,7 @@ var _ = BeforeSuite(func() { viper.SetDefault("HOST_PORT_CM_NAME", "kubeblocks-host-ports") viper.SetDefault(constant.EnableRBACManager, true) - err = intctrlutil.InitHostPortManager(k8sClient) + err = intctrlutil.InitDefaultHostPortManager(k8sClient) Expect(err).ToNot(HaveOccurred()) err = (&apps.ClusterDefinitionReconciler{ diff --git a/controllers/apps/component/component_controller.go b/controllers/apps/component/component_controller.go index cb307e7ac0a..36e3b3f512f 100644 --- a/controllers/apps/component/component_controller.go +++ b/controllers/apps/component/component_controller.go @@ -151,6 +151,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( &componentMonitorContainerTransformer{}, // allocate ports for host-network component &componentHostNetworkTransformer{}, + // map for container ports to host ports + &componentHostPortTransformer{}, // handle component services &componentServiceTransformer{}, // handle component system accounts diff --git a/controllers/apps/component/suite_test.go b/controllers/apps/component/suite_test.go index 6b9dc8cb3f3..15aad719be7 100644 --- a/controllers/apps/component/suite_test.go +++ b/controllers/apps/component/suite_test.go @@ -168,7 +168,7 @@ var _ = BeforeSuite(func() { viper.SetDefault("HOST_PORT_CM_NAME", "kubeblocks-host-ports") viper.SetDefault(constant.EnableRBACManager, true) - err = intctrlutil.InitHostPortManager(k8sClient) + err = intctrlutil.InitDefaultHostPortManager(k8sClient) Expect(err).ToNot(HaveOccurred()) err = (&apps.ComponentDefinitionReconciler{ diff --git a/controllers/apps/component/transformer_component_deletion.go b/controllers/apps/component/transformer_component_deletion.go index 40e12169eae..5b60d717ca1 100644 --- a/controllers/apps/component/transformer_component_deletion.go +++ b/controllers/apps/component/transformer_component_deletion.go @@ -147,7 +147,7 @@ func (t *componentDeletionTransformer) deleteCompResources(transCtx *componentTr } // release the allocated host-network ports for the component - pm := intctrlutil.GetPortManager() + pm := intctrlutil.GetPortManager(comp.Spec.Network) if err = pm.ReleaseByPrefix(comp.Name); err != nil { return intctrlutil.NewRequeueError(time.Second*1, fmt.Sprintf("release host ports for component %s error: %s", comp.Name, err.Error())) } diff --git a/controllers/apps/component/transformer_component_hostnetwork.go b/controllers/apps/component/transformer_component_hostnetwork.go index ea76d8cd8a0..e5ba9504b17 100644 --- a/controllers/apps/component/transformer_component_hostnetwork.go +++ b/controllers/apps/component/transformer_component_hostnetwork.go @@ -43,18 +43,18 @@ func (t *componentHostNetworkTransformer) Transform(ctx graph.TransformContext, } synthesizedComp := transCtx.SynthesizeComponent - ports, err := allocateHostPorts(synthesizedComp) + ports, err := t.allocateHostPorts(synthesizedComp) if err != nil { return err } comp := transCtx.Component - updateObjectsWithAllocatedPorts(synthesizedComp, comp, ports) + t.updateObjectsWithAllocatedPorts(synthesizedComp, comp, ports) return nil } -func allocateHostPorts(synthesizedComp *component.SynthesizedComponent) (map[string]map[string]int32, error) { +func (t *componentHostNetworkTransformer) allocateHostPorts(synthesizedComp *component.SynthesizedComponent) (map[string]map[string]int32, error) { ports := map[string]map[string]bool{} for _, c := range synthesizedComp.HostNetwork.ContainerPorts { for _, p := range c.Ports { @@ -65,7 +65,7 @@ func allocateHostPorts(synthesizedComp *component.SynthesizedComponent) (map[str } } - pm := intctrlutil.GetPortManager() + pm := intctrlutil.GetPortManager(synthesizedComp.Network) needAllocate := func(c string, p string) bool { containerPorts, ok := ports[c] if !ok { @@ -73,10 +73,10 @@ func allocateHostPorts(synthesizedComp *component.SynthesizedComponent) (map[str } return containerPorts[p] } - return allocateHostPortsWithFunc(pm, synthesizedComp, needAllocate) + return t.allocateHostPortsWithFunc(pm, synthesizedComp, needAllocate) } -func allocateHostPortsWithFunc(pm *intctrlutil.PortManager, synthesizedComp *component.SynthesizedComponent, +func (t *componentHostNetworkTransformer) allocateHostPortsWithFunc(pm intctrlutil.PortManager, synthesizedComp *component.SynthesizedComponent, needAllocate func(string, string) bool) (map[string]map[string]int32, error) { ports := map[string]map[string]int32{} insert := func(c, pk string, pv int32) { @@ -87,7 +87,7 @@ func allocateHostPortsWithFunc(pm *intctrlutil.PortManager, synthesizedComp *com } for _, c := range synthesizedComp.PodSpec.Containers { for _, p := range c.Ports { - portKey := intctrlutil.BuildHostPortName(synthesizedComp.ClusterName, synthesizedComp.Name, c.Name, p.Name) + portKey := pm.PortKey(synthesizedComp.ClusterName, synthesizedComp.Name, c.Name, p.Name) if needAllocate(c.Name, p.Name) { port, err := pm.AllocatePort(portKey) if err != nil { @@ -104,7 +104,7 @@ func allocateHostPortsWithFunc(pm *intctrlutil.PortManager, synthesizedComp *com return ports, nil } -func updateObjectsWithAllocatedPorts(synthesizedComp *component.SynthesizedComponent, +func (t *componentHostNetworkTransformer) updateObjectsWithAllocatedPorts(synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component, ports map[string]map[string]int32) { synthesizedComp.PodSpec.HostNetwork = true if comp.Spec.Network != nil && comp.Spec.Network.DNSPolicy != nil { diff --git a/controllers/apps/component/transformer_component_hostport.go b/controllers/apps/component/transformer_component_hostport.go new file mode 100644 index 00000000000..98d22fd915f --- /dev/null +++ b/controllers/apps/component/transformer_component_hostport.go @@ -0,0 +1,58 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package component + +import ( + "github.com/apecloud/kubeblocks/pkg/controller/graph" +) + +type componentHostPortTransformer struct{} + +var _ graph.Transformer = &componentHostPortTransformer{} + +func (t *componentHostPortTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error { + transCtx, _ := ctx.(*componentTransformContext) + if isCompDeleting(transCtx.ComponentOrig) { + return nil + } + + synthesizedComp := transCtx.SynthesizeComponent + if synthesizedComp == nil || + synthesizedComp.PodSpec.HostNetwork || + synthesizedComp.Network == nil || + synthesizedComp.Network.HostNetwork { + return nil + } + + ports := map[string]int32{} + for _, hostPort := range synthesizedComp.Network.HostPorts { + ports[hostPort.Name] = hostPort.Port + } + if len(ports) > 0 { + for i, c := range synthesizedComp.PodSpec.Containers { + for j, p := range c.Ports { + if hostPort, ok := ports[p.Name]; ok { + synthesizedComp.PodSpec.Containers[i].Ports[j].HostPort = hostPort + } + } + } + } + return nil +} diff --git a/controllers/apps/rollout/suite_test.go b/controllers/apps/rollout/suite_test.go index 0488d912d58..ecc4c57b720 100644 --- a/controllers/apps/rollout/suite_test.go +++ b/controllers/apps/rollout/suite_test.go @@ -124,7 +124,7 @@ var _ = BeforeSuite(func() { viper.SetDefault("HOST_PORT_CM_NAME", "kubeblocks-host-ports") viper.SetDefault(constant.EnableRBACManager, true) - err = intctrlutil.InitHostPortManager(k8sClient) + err = intctrlutil.InitDefaultHostPortManager(k8sClient) Expect(err).ToNot(HaveOccurred()) err = (&RolloutReconciler{ diff --git a/controllers/apps/suite_test.go b/controllers/apps/suite_test.go index 4264e7e2fa5..6db9c2bfe05 100644 --- a/controllers/apps/suite_test.go +++ b/controllers/apps/suite_test.go @@ -149,7 +149,7 @@ var _ = BeforeSuite(func() { viper.SetDefault("HOST_PORT_CM_NAME", "kubeblocks-host-ports") viper.SetDefault(constant.EnableRBACManager, true) - err = intctrlutil.InitHostPortManager(k8sClient) + err = intctrlutil.InitDefaultHostPortManager(k8sClient) Expect(err).ToNot(HaveOccurred()) err = (&ClusterDefinitionReconciler{ diff --git a/controllers/operations/suite_test.go b/controllers/operations/suite_test.go index 31196414768..3918f96fa0d 100644 --- a/controllers/operations/suite_test.go +++ b/controllers/operations/suite_test.go @@ -179,7 +179,7 @@ var _ = BeforeSuite(func() { viper.SetDefault("HOST_PORT_CM_NAME", "kubeblocks-host-ports") viper.SetDefault(constant.EnableRBACManager, true) - err = intctrlutil.InitHostPortManager(k8sClient) + err = intctrlutil.InitDefaultHostPortManager(k8sClient) Expect(err).ToNot(HaveOccurred()) clusterRecorder = k8sManager.GetEventRecorderFor("cluster-controller") diff --git a/deploy/helm/crds/apps.kubeblocks.io_clusters.yaml b/deploy/helm/crds/apps.kubeblocks.io_clusters.yaml index 24360af49a8..b4a4448c603 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_clusters.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_clusters.yaml @@ -2709,7 +2709,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -2774,6 +2774,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- @@ -13959,7 +13993,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -14024,6 +14058,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- diff --git a/deploy/helm/crds/apps.kubeblocks.io_components.yaml b/deploy/helm/crds/apps.kubeblocks.io_components.yaml index 7ed168a4bad..1ba48b17bcb 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_components.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_components.yaml @@ -2488,7 +2488,7 @@ spec: description: Defines the network configuration for the Component. properties: dnsConfig: - description: Specifies the DNS parameters of a pod. + description: Specifies the DNS parameters of the pod. properties: nameservers: description: |- @@ -2553,6 +2553,40 @@ spec: description: Host networking requested for this pod. Use the host's network namespace. type: boolean + hostPorts: + description: |- + HostPorts specifies the mapping of container ports to host ports. + The behavior varies based on the HostNetwork setting: + + + 1. When HostNetwork is enabled: + - If this field is empty: All ports are automatically allocated by the host-port manager. + - If this field is specified: + a) Mappings for all ports defined in `cmpd.spec.hostNetwork` are MANDATORY. + b) Mappings for kbagent ports ("http", "streaming") are OPTIONAL. + You can explicitly map them here, or leave them omitted to be allocated by the host-port manager. + + + 2. When HostNetwork is disabled: + It allows optional mapping for container ports to host ports. + - Mappings are restricted to ports defined in `cmpd.spec.runtime.containers.ports`. + - Any specified container ports not present in the runtime definition will be ignored. + items: + properties: + name: + description: The name of the container port. + type: string + port: + description: The port number of the host port. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + - port + type: object + type: array type: object offlineInstances: description: |- diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md index 438c1391026..bd5f4f5aea8 100644 --- a/docs/developer_docs/api-reference/cluster.md +++ b/docs/developer_docs/api-reference/cluster.md @@ -6153,7 +6153,38 @@ Kubernetes core/v1.PodDNSConfig (Optional) -

Specifies the DNS parameters of a pod.

+

Specifies the DNS parameters of the pod.

+ + + + +hostPorts
+ + +[]HostPort + + + + +(Optional) +

HostPorts specifies the mapping of container ports to host ports. +The behavior varies based on the HostNetwork setting:

+
    +
  1. When HostNetwork is enabled:

    +
      +
    • If this field is empty: All ports are automatically allocated by the host-port manager.
    • +
    • If this field is specified: +a) Mappings for all ports defined in cmpd.spec.hostNetwork are MANDATORY. +b) Mappings for kbagent ports (“http”, “streaming”) are OPTIONAL. +You can explicitly map them here, or leave them omitted to be allocated by the host-port manager.
    • +
  2. +
  3. When HostNetwork is disabled: +It allows optional mapping for container ports to host ports.

    +
      +
    • Mappings are restricted to ports defined in cmpd.spec.runtime.containers.ports.
    • +
    • Any specified container ports not present in the runtime definition will be ignored.
    • +
  4. +
@@ -8430,6 +8461,45 @@ ContainerVars +

HostPort +

+

+(Appears on:ComponentNetwork) +

+
+
+ + + + + + + + + + + + + + + + + +
FieldDescription
+name
+ +string + +
+

The name of the container port.

+
+port
+ +int32 + +
+

The port number of the host port.

+

InstanceTemplate

diff --git a/pkg/controller/component/synthesize_component.go b/pkg/controller/component/synthesize_component.go index 958cd583cd9..9fc52a96bd1 100644 --- a/pkg/controller/component/synthesize_component.go +++ b/pkg/controller/component/synthesize_component.go @@ -96,6 +96,7 @@ func BuildSynthesizedComponent(ctx context.Context, cli client.Reader, StaticAnnotations: compDef.Spec.Annotations, DynamicAnnotations: comp.Spec.Annotations, PodSpec: &compDef.Spec.Runtime, + Network: comp.Spec.Network, HostNetwork: compDefObj.Spec.HostNetwork, ComponentServices: compDefObj.Spec.Services, LogConfigs: compDefObj.Spec.LogConfigs, diff --git a/pkg/controller/component/type.go b/pkg/controller/component/type.go index 7f60914a2f5..829aefd8300 100644 --- a/pkg/controller/component/type.go +++ b/pkg/controller/component/type.go @@ -75,10 +75,11 @@ type SynthesizedComponent struct { LifecycleActions *kbappsv1.ComponentLifecycleActions `json:"lifecycleActions,omitempty"` SystemAccounts []kbappsv1.SystemAccount `json:"systemAccounts,omitempty"` Volumes []kbappsv1.ComponentVolume `json:"volumes,omitempty"` - HostNetwork *kbappsv1.HostNetwork `json:"hostNetwork,omitempty"` - ComponentServices []kbappsv1.ComponentService `json:"componentServices,omitempty"` - MinReadySeconds int32 `json:"minReadySeconds,omitempty"` - DisableExporter *bool `json:"disableExporter,omitempty"` + Network *kbappsv1.ComponentNetwork + HostNetwork *kbappsv1.HostNetwork `json:"hostNetwork,omitempty"` + ComponentServices []kbappsv1.ComponentService `json:"componentServices,omitempty"` + MinReadySeconds int32 `json:"minReadySeconds,omitempty"` + DisableExporter *bool `json:"disableExporter,omitempty"` Stop *bool EnableInstanceAPI *bool InstanceAssistantObjects []corev1.ObjectReference diff --git a/pkg/controller/component/utils.go b/pkg/controller/component/utils.go index 0f2f1d6b8da..2d7a3835949 100644 --- a/pkg/controller/component/utils.go +++ b/pkg/controller/component/utils.go @@ -152,6 +152,9 @@ func hasHostNetworkEnabled(synthesizedComp *SynthesizedComponent, if synthesizedComp != nil && synthesizedComp.PodSpec.HostNetwork { return true } + if synthesizedComp != nil && synthesizedComp.Network != nil && synthesizedComp.Network.HostNetwork { + return true + } if comp != nil && comp.Spec.Network != nil && comp.Spec.Network.HostNetwork { return true } @@ -165,33 +168,15 @@ func hasHostNetworkEnabled(synthesizedComp *SynthesizedComponent, return slices.Index(strings.Split(comps, ","), compName) >= 0 } -func getHostNetworkPort(ctx context.Context, _ client.Reader, clusterName, compName, cName, pName string) (int32, error) { - key := intctrlutil.BuildHostPortName(clusterName, compName, cName, pName) - if v, ok := ctx.Value(mockHostNetworkPortManagerKey{}).(map[string]int32); ok { - if p, okk := v[key]; okk { - return p, nil - } - return 0, nil - } - pm := intctrlutil.GetPortManager() +func getHostNetworkPort(synthesizedComp *SynthesizedComponent, clusterName, compName, cName, pName string) (int32, error) { + pm := intctrlutil.GetPortManager(synthesizedComp.Network) if pm == nil { return 0, nil } + key := pm.PortKey(clusterName, compName, cName, pName) return pm.GetPort(key) } -func mockHostNetworkPort(ctx context.Context, _ client.Reader, clusterName, compName, cName, pName string, port int32) context.Context { - key := intctrlutil.BuildHostPortName(clusterName, compName, cName, pName) - mockHostNetworkPortManager[key] = port - return context.WithValue(ctx, mockHostNetworkPortManagerKey{}, mockHostNetworkPortManager) -} - -var ( - mockHostNetworkPortManager = map[string]int32{} -) - -type mockHostNetworkPortManagerKey struct{} - func UDFReconfigureActionName(tpl SynthesizedFileTemplate) string { return fmt.Sprintf("reconfigure-%s", tpl.Name) } diff --git a/pkg/controller/component/vars.go b/pkg/controller/component/vars.go index bf95518a8fc..6232eec00f0 100644 --- a/pkg/controller/component/vars.go +++ b/pkg/controller/component/vars.go @@ -458,7 +458,8 @@ func resolveHostNetworkPortRef(ctx context.Context, cli client.Reader, synthesiz defineKey string, selector appsv1.HostNetworkVarSelector) ([]*corev1.EnvVar, []*corev1.EnvVar, error) { resolvePort := func(obj any) (*corev1.EnvVar, *corev1.EnvVar, error) { compName := obj.(string) - port, _ := getHostNetworkPort(ctx, cli, synthesizedComp.ClusterName, compName, selector.Container.Name, selector.Container.Port.Name) + port, _ := getHostNetworkPort(synthesizedComp, + synthesizedComp.ClusterName, compName, selector.Container.Name, selector.Container.Port.Name) if port > 0 { return &corev1.EnvVar{ Name: defineKey, diff --git a/pkg/controller/component/vars_test.go b/pkg/controller/component/vars_test.go index 2c3b7d8c91b..405484a06ad 100644 --- a/pkg/controller/component/vars_test.go +++ b/pkg/controller/component/vars_test.go @@ -331,16 +331,13 @@ var _ = Describe("vars", func() { Expect(err.Error()).Should(And(ContainSubstring("has no HostNetwork"), ContainSubstring("found when resolving vars"))) By("has no host-network port") - synthesizedComp.Annotations = map[string]string{ - constant.HostNetworkAnnotationKey: synthesizedComp.Name, - } + synthesizedComp.Network = &appsv1.ComponentNetwork{HostNetwork: true} _, _, err = ResolveTemplateNEnvVars(ctx, testCtx.Cli, synthesizedComp, vars) Expect(err).ShouldNot(Succeed()) Expect(err.Error()).Should(ContainSubstring("the required var is not found")) By("ok") - ctx := mockHostNetworkPort(testCtx.Ctx, testCtx.Cli, - synthesizedComp.ClusterName, synthesizedComp.Name, "default", "default", 30001) + synthesizedComp.Network.HostPorts = []appsv1.HostPort{{Name: "default", Port: 30001}} templateVars, envVars, err := ResolveTemplateNEnvVars(ctx, testCtx.Cli, synthesizedComp, vars) Expect(err).Should(Succeed()) Expect(templateVars).Should(HaveKeyWithValue("host-network-port", "30001")) @@ -375,7 +372,7 @@ var _ = Describe("vars", func() { checkEnvVarWithValue(envVars, "host-network-port", "30001") By("w/ default value - back-off to default value") - synthesizedComp.Annotations = nil // disable the host-network + synthesizedComp.Network = nil // disable the host-network templateVars, envVars, err = ResolveTemplateNEnvVars(testCtx.Ctx, testCtx.Cli, synthesizedComp, vars) Expect(err).Should(Succeed()) Expect(templateVars).Should(HaveKeyWithValue("host-network-port", "3306")) diff --git a/pkg/controllerutil/controller_common.go b/pkg/controllerutil/controller_common.go index 5e23bcb2ddb..092047e40bb 100644 --- a/pkg/controllerutil/controller_common.go +++ b/pkg/controllerutil/controller_common.go @@ -23,9 +23,7 @@ import ( "context" "fmt" "reflect" - "strconv" "strings" - "sync" "time" "github.com/go-logr/logr" @@ -34,7 +32,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,7 +40,6 @@ import ( appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" "github.com/apecloud/kubeblocks/pkg/constant" - viper "github.com/apecloud/kubeblocks/pkg/viperx" ) // ResultToP converts a Result object to a pointer. @@ -268,331 +264,3 @@ func CheckResourceExists( // if found, return true return true, nil } - -var ( - portManager *PortManager -) - -func InitHostPortManager(cli client.Client) error { - cm := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: viper.GetString(constant.CfgHostPortConfigMapName), - Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS), - }, - Data: make(map[string]string), - } - parsePortRange := func(item string) (int64, int64, error) { - parts := strings.Split(item, "-") - var ( - from int64 - to int64 - err error - ) - switch len(parts) { - case 2: - from, err = strconv.ParseInt(parts[0], 10, 32) - if err != nil { - return from, to, err - } - to, err = strconv.ParseInt(parts[1], 10, 32) - if err != nil { - return from, to, err - } - if from > to { - return from, to, fmt.Errorf("invalid port range %s", item) - } - case 1: - from, err = strconv.ParseInt(parts[0], 10, 32) - if err != nil { - return from, to, err - } - to = from - default: - return from, to, fmt.Errorf("invalid port range %s", item) - } - return from, to, nil - } - parsePortRanges := func(portRanges string) ([]PortRange, error) { - var ranges []PortRange - for _, item := range strings.Split(portRanges, ",") { - item = strings.TrimSpace(item) - if item == "" { - continue - } - from, to, err := parsePortRange(item) - if err != nil { - return nil, err - } - ranges = append(ranges, PortRange{ - Min: int32(from), - Max: int32(to), - }) - } - return ranges, nil - } - var err error - if err = cli.Create(context.Background(), cm); err != nil { - if !apierrors.IsAlreadyExists(err) { - return err - } - } - includes, err := parsePortRanges(viper.GetString(constant.CfgHostPortIncludeRanges)) - if err != nil { - return err - } - excludes, err := parsePortRanges(viper.GetString(constant.CfgHostPortExcludeRanges)) - if err != nil { - return err - } - portManager, err = NewPortManager(includes, excludes, cli) - return err -} - -func GetPortManager() *PortManager { - return portManager -} - -func BuildHostPortName(clusterName, compName, containerName, portName string) string { - return fmt.Sprintf("%s-%s-%s-%s", clusterName, compName, containerName, portName) -} - -type PortManager struct { - sync.Mutex - cli client.Client - from int32 - to int32 - cursor int32 - includes []PortRange - excludes []PortRange - used map[int32]string - cm *corev1.ConfigMap -} - -type PortRange struct { - Min int32 - Max int32 -} - -// NewPortManager creates a new PortManager -// TODO[ziang] Putting all the port information in one configmap may have performance issues and is not secure enough. -// There is a risk of accidental deletion leading to the loss of cluster port information. -func NewPortManager(includes []PortRange, excludes []PortRange, cli client.Client) (*PortManager, error) { - var ( - from int32 - to int32 - ) - for _, item := range includes { - if item.Min < from || from == 0 { - from = item.Min - } - if item.Max > to { - to = item.Max - } - } - pm := &PortManager{ - cli: cli, - from: from, - to: to, - cursor: from, - includes: includes, - excludes: excludes, - used: make(map[int32]string), - } - if err := pm.sync(); err != nil { - return nil, err - } - return pm, nil -} - -func (pm *PortManager) parsePort(port string) (int32, error) { - port = strings.TrimSpace(port) - if port == "" { - return 0, fmt.Errorf("port is empty") - } - p, err := strconv.ParseInt(port, 10, 32) - if err != nil { - return 0, err - } - return int32(p), nil -} - -func (pm *PortManager) sync() error { - cm := &corev1.ConfigMap{} - objKey := types.NamespacedName{ - Name: viper.GetString(constant.CfgHostPortConfigMapName), - Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS), - } - if err := pm.cli.Get(context.Background(), objKey, cm); err != nil { - return err - } - if cm.Data == nil { - cm.Data = make(map[string]string) - } - used := make(map[int32]string) - for key, item := range cm.Data { - port, err := pm.parsePort(item) - if err != nil { - continue - } - used[port] = key - } - for _, item := range pm.excludes { - for port := item.Min; port <= item.Max; port++ { - used[port] = "" - } - } - - pm.cm = cm - pm.used = used - return nil -} - -func (pm *PortManager) update(key string, port int32) error { - var err error - defer func() { - if apierrors.IsConflict(err) { - _ = pm.sync() - } - }() - cm := pm.cm.DeepCopy() - if cm.Data == nil { - cm.Data = make(map[string]string) - } - cm.Data[key] = fmt.Sprintf("%d", port) - err = pm.cli.Update(context.Background(), cm) - if err != nil { - return err - } - - pm.cm = cm - pm.used[port] = key - return nil -} - -func (pm *PortManager) delete(keys []string) error { - if pm.cm.Data == nil { - return nil - } - - var err error - defer func() { - if apierrors.IsConflict(err) { - _ = pm.sync() - } - }() - - cm := pm.cm.DeepCopy() - var ports []int32 - for _, key := range keys { - value, ok := cm.Data[key] - if !ok { - continue - } - port, err := pm.parsePort(value) - if err != nil { - return err - } - ports = append(ports, port) - delete(cm.Data, key) - } - err = pm.cli.Update(context.Background(), cm) - if err != nil { - return err - } - pm.cm = cm - for _, port := range ports { - delete(pm.used, port) - } - return nil -} - -func (pm *PortManager) GetPort(key string) (int32, error) { - pm.Lock() - defer pm.Unlock() - - if value, ok := pm.cm.Data[key]; ok { - port, err := pm.parsePort(value) - if err != nil { - return 0, err - } - return port, nil - } - return 0, nil -} - -func (pm *PortManager) UsePort(key string, port int32) error { - pm.Lock() - defer pm.Unlock() - if k, ok := pm.used[port]; ok && k != key { - return fmt.Errorf("port %d is used by %s", port, k) - } - if err := pm.update(key, port); err != nil { - return err - } - return nil -} - -func (pm *PortManager) AllocatePort(key string) (int32, error) { - pm.Lock() - defer pm.Unlock() - - if value, ok := pm.cm.Data[key]; ok { - port, err := pm.parsePort(value) - if err != nil { - return 0, err - } - return port, nil - } - - if len(pm.used) >= int(pm.to-pm.from)+1 { - return 0, fmt.Errorf("no available port") - } - - for { - if _, ok := pm.used[pm.cursor]; !ok { - break - } - pm.cursor++ - if pm.cursor > pm.to { - pm.cursor = pm.from - } - } - if err := pm.update(key, pm.cursor); err != nil { - return 0, err - } - return pm.cursor, nil -} - -func (pm *PortManager) ReleasePort(key string) error { - return pm.ReleasePorts([]string{key}) -} - -func (pm *PortManager) ReleasePorts(keys []string) error { - pm.Lock() - defer pm.Unlock() - for _, key := range keys { - if err := pm.delete([]string{key}); err != nil { - return err - } - } - return nil -} - -func (pm *PortManager) ReleaseByPrefix(prefix string) error { - if prefix == "" { - return nil - } - pm.Lock() - defer pm.Unlock() - - var keys []string - for key := range pm.cm.Data { - if strings.HasPrefix(key, prefix) { - keys = append(keys, key) - } - } - if err := pm.delete(keys); err != nil { - return err - } - return nil -} diff --git a/pkg/controllerutil/host_port_manager.go b/pkg/controllerutil/host_port_manager.go new file mode 100644 index 00000000000..f75c3d26085 --- /dev/null +++ b/pkg/controllerutil/host_port_manager.go @@ -0,0 +1,445 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package controllerutil + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/kbagent" + viper "github.com/apecloud/kubeblocks/pkg/viperx" +) + +type PortManager interface { + GetPort(key string) (int32, error) + UsePort(key string, port int32) error + AllocatePort(key string) (int32, error) + ReleaseByPrefix(prefix string) error + PortKey(clusterName, compName, containerName, portName string) string +} + +var ( + defaultPortManager PortManager +) + +func GetPortManager(network *appsv1.ComponentNetwork) PortManager { + if network == nil || !network.HostNetwork || len(network.HostPorts) == 0 { + return defaultPortManager + } + if defaultPortManager == nil { + return newDefinedPortManager(nil, network.HostPorts) + } + return newDefinedPortManager(defaultPortManager.(*portManager), network.HostPorts) +} + +func InitDefaultHostPortManager(cli client.Client) error { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: viper.GetString(constant.CfgHostPortConfigMapName), + Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS), + }, + Data: make(map[string]string), + } + parsePortRange := func(item string) (int64, int64, error) { + parts := strings.Split(item, "-") + var ( + from int64 + to int64 + err error + ) + switch len(parts) { + case 2: + from, err = strconv.ParseInt(parts[0], 10, 32) + if err != nil { + return from, to, err + } + to, err = strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return from, to, err + } + if from > to { + return from, to, fmt.Errorf("invalid port range %s", item) + } + case 1: + from, err = strconv.ParseInt(parts[0], 10, 32) + if err != nil { + return from, to, err + } + to = from + default: + return from, to, fmt.Errorf("invalid port range %s", item) + } + return from, to, nil + } + parsePortRanges := func(portRanges string) ([]portRange, error) { + var ranges []portRange + for _, item := range strings.Split(portRanges, ",") { + item = strings.TrimSpace(item) + if item == "" { + continue + } + from, to, err := parsePortRange(item) + if err != nil { + return nil, err + } + ranges = append(ranges, portRange{ + Min: int32(from), + Max: int32(to), + }) + } + return ranges, nil + } + var err error + if err = cli.Create(context.Background(), cm); err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + } + includes, err := parsePortRanges(viper.GetString(constant.CfgHostPortIncludeRanges)) + if err != nil { + return err + } + excludes, err := parsePortRanges(viper.GetString(constant.CfgHostPortExcludeRanges)) + if err != nil { + return err + } + defaultPortManager, err = newDefaultPortManager(includes, excludes, cli) + return err +} + +type portManager struct { + sync.Mutex + cli client.Client + from int32 + to int32 + cursor int32 + includes []portRange + excludes []portRange + used map[int32]string + cm *corev1.ConfigMap +} + +type portRange struct { + Min int32 + Max int32 +} + +// newDefaultPortManager creates a new default port manager +// TODO[ziang] Putting all the port information in one configmap may have performance issues and is not secure enough. +// There is a risk of accidental deletion leading to the loss of cluster port information. +func newDefaultPortManager(includes []portRange, excludes []portRange, cli client.Client) (*portManager, error) { + var ( + from int32 + to int32 + ) + for _, item := range includes { + if item.Min < from || from == 0 { + from = item.Min + } + if item.Max > to { + to = item.Max + } + } + pm := &portManager{ + cli: cli, + from: from, + to: to, + cursor: from, + includes: includes, + excludes: excludes, + used: make(map[int32]string), + } + if err := pm.sync(); err != nil { + return nil, err + } + return pm, nil +} + +func (m *portManager) PortKey(clusterName, compName, containerName, portName string) string { + return fmt.Sprintf("%s-%s-%s-%s", clusterName, compName, containerName, portName) +} + +func (m *portManager) parsePort(port string) (int32, error) { + port = strings.TrimSpace(port) + if port == "" { + return 0, fmt.Errorf("port is empty") + } + p, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, err + } + return int32(p), nil +} + +func (m *portManager) sync() error { + cm := &corev1.ConfigMap{} + objKey := types.NamespacedName{ + Name: viper.GetString(constant.CfgHostPortConfigMapName), + Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS), + } + if err := m.cli.Get(context.Background(), objKey, cm); err != nil { + return err + } + if cm.Data == nil { + cm.Data = make(map[string]string) + } + used := make(map[int32]string) + for key, item := range cm.Data { + port, err := m.parsePort(item) + if err != nil { + continue + } + used[port] = key + } + for _, item := range m.excludes { + for port := item.Min; port <= item.Max; port++ { + used[port] = "" + } + } + + m.cm = cm + m.used = used + return nil +} + +func (m *portManager) update(key string, port int32) error { + var err error + defer func() { + if apierrors.IsConflict(err) { + _ = m.sync() + } + }() + cm := m.cm.DeepCopy() + if cm.Data == nil { + cm.Data = make(map[string]string) + } + cm.Data[key] = fmt.Sprintf("%d", port) + err = m.cli.Update(context.Background(), cm) + if err != nil { + return err + } + + m.cm = cm + m.used[port] = key + return nil +} + +func (m *portManager) delete(keys []string) error { + if m.cm.Data == nil { + return nil + } + + var err error + defer func() { + if apierrors.IsConflict(err) { + _ = m.sync() + } + }() + + cm := m.cm.DeepCopy() + var ports []int32 + for _, key := range keys { + value, ok := cm.Data[key] + if !ok { + continue + } + port, err := m.parsePort(value) + if err != nil { + return err + } + ports = append(ports, port) + delete(cm.Data, key) + } + err = m.cli.Update(context.Background(), cm) + if err != nil { + return err + } + m.cm = cm + for _, port := range ports { + delete(m.used, port) + } + return nil +} + +func (m *portManager) GetPort(key string) (int32, error) { + m.Lock() + defer m.Unlock() + + if value, ok := m.cm.Data[key]; ok { + port, err := m.parsePort(value) + if err != nil { + return 0, err + } + return port, nil + } + return 0, nil +} + +func (m *portManager) UsePort(key string, port int32) error { + m.Lock() + defer m.Unlock() + if k, ok := m.used[port]; ok && k != key { + return fmt.Errorf("port %d is used by %s", port, k) + } + if err := m.update(key, port); err != nil { + return err + } + return nil +} + +func (m *portManager) AllocatePort(key string) (int32, error) { + m.Lock() + defer m.Unlock() + + if value, ok := m.cm.Data[key]; ok { + port, err := m.parsePort(value) + if err != nil { + return 0, err + } + return port, nil + } + + if len(m.used) >= int(m.to-m.from)+1 { + return 0, fmt.Errorf("no available port: %s", key) + } + + for { + if _, ok := m.used[m.cursor]; !ok { + break + } + m.cursor++ + if m.cursor > m.to { + m.cursor = m.from + } + } + if err := m.update(key, m.cursor); err != nil { + return 0, err + } + return m.cursor, nil +} + +func (m *portManager) ReleaseByPrefix(prefix string) error { + if prefix == "" { + return nil + } + m.Lock() + defer m.Unlock() + + var keys []string + for key := range m.cm.Data { + if strings.HasPrefix(key, prefix) { + keys = append(keys, key) + } + } + if len(keys) > 0 { + return m.delete(keys) + } + return nil +} + +type definedPortManager struct { + defaultPortManager *portManager + hostPorts map[string]int32 +} + +func (m *definedPortManager) PortKey(clusterName, compName, containerName, portName string) string { + if m.isKBAgentPortNNotDefined(containerName, portName) { + return m.defaultPortManager.PortKey(clusterName, compName, containerName, portName) + } + return portName +} + +func (m *definedPortManager) GetPort(key string) (int32, error) { + if m.isKBAgentPortNNotDefinedInKey(key) { + return m.defaultPortManager.GetPort(key) + } + return m.hostPorts[key], nil +} + +func (m *definedPortManager) UsePort(key string, port int32) error { + if m.isKBAgentPortNNotDefinedInKey(key) { + return m.defaultPortManager.UsePort(key, port) + } + return nil +} + +func (m *definedPortManager) AllocatePort(key string) (int32, error) { + port, ok := m.hostPorts[key] + if ok { + return port, nil + } + if m.isKBAgentPortNNotDefinedInKey(key) { + return m.defaultPortManager.AllocatePort(key) + } + return 0, fmt.Errorf("no available port: %s", key) + +} + +func (m *definedPortManager) ReleaseByPrefix(prefix string) error { + if m.hasKBAgentPortDefined() { + return nil + } + return m.defaultPortManager.ReleaseByPrefix(prefix) +} + +func (m *definedPortManager) isKBAgentPort(containerName, portName string) bool { + return containerName == kbagent.ContainerName && (portName == kbagent.DefaultHTTPPortName || portName == kbagent.DefaultStreamingPortName) +} + +func (m *definedPortManager) hasKBAgentPortDefined() bool { + _, http := m.hostPorts[kbagent.DefaultHTTPPortName] + _, stream := m.hostPorts[kbagent.DefaultStreamingPortName] + return http && stream +} + +func (m *definedPortManager) isKBAgentPortNNotDefined(containerName, portName string) bool { + _, defined := m.hostPorts[portName] + return m.isKBAgentPort(containerName, portName) && !defined +} + +func (m *definedPortManager) isKBAgentPortNNotDefinedInKey(key string) bool { + subs := strings.Split(key, "-") + if len(subs) < 4 { + return false + } + return m.isKBAgentPortNNotDefined(subs[len(subs)-2], subs[len(subs)-1]) +} + +func newDefinedPortManager(defaultPortManager *portManager, hostPorts []appsv1.HostPort) *definedPortManager { + hostPortsMap := make(map[string]int32) + for _, hp := range hostPorts { + hostPortsMap[hp.Name] = hp.Port + } + return &definedPortManager{ + defaultPortManager: defaultPortManager, + hostPorts: hostPortsMap, + } +} diff --git a/pkg/controllerutil/host_port_manager_test.go b/pkg/controllerutil/host_port_manager_test.go new file mode 100644 index 00000000000..34e381fb349 --- /dev/null +++ b/pkg/controllerutil/host_port_manager_test.go @@ -0,0 +1,311 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package controllerutil + +import ( + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/kbagent" + testutil "github.com/apecloud/kubeblocks/pkg/testutil/k8s" + viper "github.com/apecloud/kubeblocks/pkg/viperx" +) + +var _ = Describe("host port manager test", func() { + var ( + clusterName = "test-cluster" + compName = "comp" + containerName = "container" + portName = "app" + portNumber = int32(1234) + manager PortManager + ) + + Context("defined host-port manager", func() { + var ( + network = &appsv1.ComponentNetwork{ + HostNetwork: true, + HostPorts: []appsv1.HostPort{ + { + Name: portName, + Port: portNumber, + }, + { + Name: kbagent.DefaultHTTPPortName, + Port: kbagent.DefaultHTTPPort, + }, + { + Name: kbagent.DefaultStreamingPortName, + Port: kbagent.DefaultStreamingPort, + }, + }, + } + ) + + BeforeEach(func() { + defaultPortManager = nil + manager = GetPortManager(network) + }) + + AfterEach(func() { + }) + + It("port key", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + Expect(key).To(Equal(portName)) + }) + + It("port key - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + Expect(key).To(Equal(kbagent.DefaultHTTPPortName)) + }) + + It("allocate port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + port, err := manager.AllocatePort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(portNumber)) + }) + + It("allocate port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + port, err := manager.AllocatePort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(int32(kbagent.DefaultHTTPPort))) + }) + + It("allocate port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + _, err := manager.AllocatePort(key) + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(ContainSubstring("no available port")) + }) + + It("get port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(portNumber)) + }) + + It("get port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(int32(kbagent.DefaultHTTPPort))) + }) + + It("get port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(int32(0))) + }) + + It("use port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + err := manager.UsePort(key, portNumber) + Expect(err).Should(BeNil()) + }) + + It("use port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + err := manager.UsePort(key, kbagent.DefaultHTTPPort) + Expect(err).Should(BeNil()) + }) + + It("use port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + err := manager.UsePort(key, portNumber) + Expect(err).Should(BeNil()) + }) + + It("release port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + err := manager.ReleaseByPrefix(key) + Expect(err).Should(BeNil()) + }) + }) + + Context("defined host-port manager - w/o kbagent", func() { + var ( + mockClient *testutil.K8sClientMockHelper + network = &appsv1.ComponentNetwork{ + HostNetwork: true, + HostPorts: []appsv1.HostPort{ + { + Name: portName, + Port: portNumber, + }, + }, + } + minPort, maxPort = int32(1024), int32(65536) + dataCM = map[string]string{} + definedPortManagerInst *definedPortManager + ) + + BeforeEach(func() { + mockClient = testutil.NewK8sMockClient() + mockClient.MockCreateMethod(testutil.WithCreateReturned(func(obj client.Object) error { + dataCM = obj.(*corev1.ConfigMap).Data + return nil + }, testutil.WithAnyTimes())) + mockClient.MockGetMethod(testutil.WithGetReturned(testutil.WithConstructSimpleGetResult([]client.Object{ + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS), + Name: viper.GetString(constant.CfgHostPortConfigMapName), + }, + Data: dataCM, + }, + }), testutil.WithAnyTimes())) + mockClient.MockUpdateMethod(testutil.WithCreateReturned(func(obj client.Object) error { + dataCM = obj.(*corev1.ConfigMap).Data + return nil + }, testutil.WithAnyTimes())) + + viper.Set(constant.CfgHostPortIncludeRanges, fmt.Sprintf("%d-%d", minPort, maxPort)) + + err := InitDefaultHostPortManager(mockClient.Client()) + Expect(err).ShouldNot(HaveOccurred()) + + manager = GetPortManager(network) + Expect(manager).ShouldNot(BeNil()) + definedPortManagerInst = manager.(*definedPortManager) + }) + + AfterEach(func() { + mockClient.Finish() + }) + + It("port key", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + Expect(key).To(Equal(portName)) + }) + + It("port key - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + Expect(key).To(Equal(fmt.Sprintf("%s-%s-%s-%s", clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName))) + }) + + It("allocate port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + port, err := manager.AllocatePort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(portNumber)) + }) + + It("allocate port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + port, err := manager.AllocatePort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(minPort)) + }) + + It("allocate port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + _, err := manager.AllocatePort(key) + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(ContainSubstring("no available port")) + }) + + It("get port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(portNumber)) + }) + + It("get port - kbagent, not allocated", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(int32(0))) + }) + + It("get port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + allocated, err1 := manager.AllocatePort(key) + Expect(err1).Should(BeNil()) + port, err2 := manager.GetPort(key) + Expect(err2).Should(BeNil()) + Expect(port).Should(Equal(allocated)) + }) + + It("get port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + port, err := manager.GetPort(key) + Expect(err).Should(BeNil()) + Expect(port).Should(Equal(int32(0))) + }) + + It("use port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + err := manager.UsePort(key, portNumber) + Expect(err).Should(BeNil()) + Expect(definedPortManagerInst.hostPorts).Should(HaveKeyWithValue(key, portNumber)) + }) + + It("use port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + err := manager.UsePort(key, int32(kbagent.DefaultHTTPPort)) + Expect(err).Should(BeNil()) + Expect(definedPortManagerInst.hostPorts).ShouldNot(HaveKey(key)) + Expect(dataCM).Should(HaveKeyWithValue(key, fmt.Sprintf("%d", kbagent.DefaultHTTPPort))) + }) + + It("use port - not defined", func() { + errPortName := fmt.Sprintf("%s-not-defined", portName) + key := manager.PortKey(clusterName, compName, containerName, errPortName) + err := manager.UsePort(key, portNumber) + Expect(err).Should(BeNil()) + Expect(definedPortManagerInst.hostPorts).ShouldNot(HaveKey(key)) + }) + + It("release port", func() { + key := manager.PortKey(clusterName, compName, containerName, portName) + err := manager.ReleaseByPrefix(key) + Expect(err).Should(BeNil()) + }) + + It("release port - kbagent", func() { + key := manager.PortKey(clusterName, compName, kbagent.ContainerName, kbagent.DefaultHTTPPortName) + err := manager.UsePort(key, int32(kbagent.DefaultHTTPPort)) + Expect(err).Should(BeNil()) + Expect(dataCM).Should(HaveKeyWithValue(key, fmt.Sprintf("%d", kbagent.DefaultHTTPPort))) + err = manager.ReleaseByPrefix(key) + Expect(err).Should(BeNil()) + Expect(dataCM).Should(BeEmpty()) + }) + }) +}) diff --git a/pkg/parameters/config_util.go b/pkg/parameters/config_util.go index 0f8f1631058..75ab7c02a26 100644 --- a/pkg/parameters/config_util.go +++ b/pkg/parameters/config_util.go @@ -509,8 +509,8 @@ func ResolveReloadServerGRPCPort(containers []corev1.Container) (int32, error) { } func allocConfigManagerHostPort(comp *component.SynthesizedComponent) (int32, error) { - pm := intctrlutil.GetPortManager() - portKey := intctrlutil.BuildHostPortName(comp.ClusterName, comp.Name, constant.ConfigSidecarName, constant.ConfigManagerPortName) + pm := intctrlutil.GetPortManager(comp.Network) + portKey := pm.PortKey(comp.ClusterName, comp.Name, constant.ConfigSidecarName, constant.ConfigManagerPortName) port, err := pm.AllocatePort(portKey) if err != nil { return constant.InvalidContainerPort, err