diff --git a/README.md b/README.md index 8e06b492e..bca8daec3 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,8 @@ metadata: name: cluster spec: namespace: netobserv + networkPolicy: + enable: false consolePlugin: advanced: env: @@ -91,6 +93,7 @@ A few remarks: - While the [web console](https://github.com/netobserv/network-observability-console-plugin) is primarily designed as a plugin for the OpenShift Console, it is still possible to deploy it as a standalone, which the dev team sometimes use for testing. This is why it is mentioned as "TEST_CONSOLE" here. - If you're in OpenShift, you should omit "TEST_CONSOLE: true" to use the Console plugin instead, which offers a better / more integrated experience. - You can change the Prometheus and Loki URLs depending on your installation. This example works if you use the "standalone" installation described above, with `install.loki=true` and `install.prom=true`. Check more configuration options for [Prometheus](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecprometheus-1) and [Loki](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecloki-1). +- You can enable networkPolicy, which makes the operator lock down the namespaces that it manages; however, this is highly dependent on your cluster topology, and may cause malfunctions, such as preventing NetObserv pods from communicating with the Kube API server. To view the test console, you can port-forward 9001: diff --git a/api/flowcollector/v1beta2/flowcollector_types.go b/api/flowcollector/v1beta2/flowcollector_types.go index 4d925a899..679af3934 100644 --- a/api/flowcollector/v1beta2/flowcollector_types.go +++ b/api/flowcollector/v1beta2/flowcollector_types.go @@ -27,8 +27,9 @@ import ( type FlowCollectorDeploymentModel string const ( - DeploymentModelDirect FlowCollectorDeploymentModel = "Direct" - DeploymentModelKafka FlowCollectorDeploymentModel = "Kafka" + DeploymentModelDirect FlowCollectorDeploymentModel = "Direct" + DeploymentModelKafka FlowCollectorDeploymentModel = "Kafka" + DeploymentModelService FlowCollectorDeploymentModel = "Service" ) // Please notice that the FlowCollectorSpec's properties MUST redefine one of the default @@ -69,11 +70,13 @@ type FlowCollectorSpec struct { ConsolePlugin FlowCollectorConsolePlugin `json:"consolePlugin,omitempty"` // `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:
- // - `Direct` (default) to make the flow processor listen directly from the agents.
+ // - `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.
+ // - `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.
// - `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.
- // Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka). + // Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+ // `Direct` is not recommended on large clusters as it is less memory efficient. // +unionDiscriminator - // +kubebuilder:validation:Enum:="Direct";"Kafka" + // +kubebuilder:validation:Enum:="Direct";"Service";"Kafka" // +kubebuilder:default:=Direct DeploymentModel FlowCollectorDeploymentModel `json:"deploymentModel,omitempty"` @@ -626,15 +629,26 @@ type FlowCollectorFLP struct { //+kubebuilder:validation:Minimum=0 //+kubebuilder:default:=3 - // `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + // `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. // This setting is ignored when Kafka is disabled. + // Deprecation notice: use `spec.processor.consumerReplicas` instead. KafkaConsumerReplicas *int32 `json:"kafkaConsumerReplicas,omitempty"` - // `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + // `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. // This setting is ignored when Kafka is disabled. + // Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`. // +optional KafkaConsumerAutoscaler FlowCollectorHPA `json:"kafkaConsumerAutoscaler,omitempty"` + //+kubebuilder:validation:Minimum=0 + // `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`. + // This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`. + ConsumerReplicas *int32 `json:"consumerReplicas,omitempty"` + + // If `unmanagedReplicas` is `true`, the operator will not reconcile `consumerReplicas`. This is useful when using a pod autoscaler. + // +optional + UnmanagedReplicas bool `json:"unmanagedReplicas,omitempty"` + //+kubebuilder:default:=1000 // +optional // `kafkaConsumerQueueCapacity` defines the capacity of the internal message queue used in the Kafka consumer client. Ignored when not using Kafka. @@ -1025,6 +1039,10 @@ type FlowCollectorConsolePlugin struct { // `replicas` defines the number of replicas (pods) to start. Replicas *int32 `json:"replicas,omitempty"` + // If `unmanagedReplicas` is `true`, the operator will not reconcile `replicas`. This is useful when using a pod autoscaler. + // +optional + UnmanagedReplicas bool `json:"unmanagedReplicas,omitempty"` + //+kubebuilder:validation:Enum=IfNotPresent;Always;Never //+kubebuilder:default:=IfNotPresent // `imagePullPolicy` is the Kubernetes pull policy for the image defined above @@ -1041,7 +1059,8 @@ type FlowCollectorConsolePlugin struct { // `logLevel` for the console plugin backend LogLevel string `json:"logLevel,omitempty"` - // `autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment. + // `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. + // Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`. // +optional Autoscaler FlowCollectorHPA `json:"autoscaler,omitempty"` diff --git a/api/flowcollector/v1beta2/flowcollector_validation_webhook.go b/api/flowcollector/v1beta2/flowcollector_validation_webhook.go index bb9eabcef..12b67bfd7 100644 --- a/api/flowcollector/v1beta2/flowcollector_validation_webhook.go +++ b/api/flowcollector/v1beta2/flowcollector_validation_webhook.go @@ -240,11 +240,16 @@ func (v *validator) validateScheduling() { } func (v *validator) validateFLPLogTypes() { - if v.fc.Processor.LogTypes != nil && *v.fc.Processor.LogTypes == LogTypeAll { - v.warnings = append(v.warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint") - } - if v.fc.Processor.LogTypes != nil && *v.fc.Processor.LogTypes != LogTypeFlows && v.fc.Loki.Enable != nil && !*v.fc.Loki.Enable { - v.errors = append(v.errors, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit")) + if v.fc.Processor.HasConntrack() { + if *v.fc.Processor.LogTypes == LogTypeAll { + v.warnings = append(v.warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint") + } + if !v.fc.UseLoki() { + v.errors = append(v.errors, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit")) + } + if v.fc.DeploymentModel == DeploymentModelService { + v.errors = append(v.errors, errors.New("cannot enable conversation tracking when spec.deploymentModel is Service: you must disable it, or change the deployment model")) + } } } diff --git a/api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go b/api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go index abf365f7f..1ece6df19 100644 --- a/api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go +++ b/api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go @@ -509,6 +509,21 @@ func TestValidateConntrack(t *testing.T) { }, expectedError: "enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit", }, + { + name: "Conntrack not allowed with deploymentModel Service", + fc: &FlowCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: FlowCollectorSpec{ + DeploymentModel: DeploymentModelService, + Processor: FlowCollectorFLP{ + LogTypes: ptr.To(LogTypeConversations), + }, + }, + }, + expectedError: "cannot enable conversation tracking when spec.deploymentModel is Service: you must disable it, or change the deployment model", + }, } r := FlowCollector{} diff --git a/api/flowcollector/v1beta2/helper.go b/api/flowcollector/v1beta2/helper.go index bbd5a93b5..1898eb0fd 100644 --- a/api/flowcollector/v1beta2/helper.go +++ b/api/flowcollector/v1beta2/helper.go @@ -33,10 +33,6 @@ func (spec *FlowCollectorSpec) HasKafkaExporter() bool { return false } -func (spec *FlowCollectorHPA) HPAEnabled() bool { - return spec != nil && spec.Status == HPAStatusEnabled -} - func (cfg *SASLConfig) UseSASL() bool { return cfg.Type == SASLPlain || cfg.Type == SASLScramSHA512 } @@ -67,6 +63,10 @@ func (spec *FlowCollectorSpec) UseTestConsolePlugin() bool { return false } +func (spec *FlowCollectorSpec) UseHostNetwork() bool { + return spec.DeploymentModel == DeploymentModelDirect +} + func (spec *FlowCollectorEBPF) IsAgentFeatureEnabled(feature AgentFeature) bool { for _, f := range spec.Features { if f == feature { @@ -196,3 +196,30 @@ func (spec *FlowCollectorSpec) HasExperimentalAlertsHealth() bool { func (spec *FlowCollectorSpec) DeployNetworkPolicy() bool { return spec.NetworkPolicy.Enable != nil && *spec.NetworkPolicy.Enable } + +func (spec *FlowCollectorFLP) GetFLPReplicas() int32 { + if spec.ConsumerReplicas != nil { + return *spec.ConsumerReplicas + } else if spec.KafkaConsumerReplicas != nil { + return *spec.KafkaConsumerReplicas + } + return 3 +} + +func (spec *FlowCollectorHPA) IsHPAEnabled() bool { + return spec != nil && spec.Status == HPAStatusEnabled +} + +func (spec *FlowCollectorFLP) IsUnmanagedFLPReplicas() bool { + if spec.UnmanagedReplicas { + return true + } + return spec.KafkaConsumerAutoscaler.IsHPAEnabled() +} + +func (spec *FlowCollectorConsolePlugin) IsUnmanagedConsolePluginReplicas() bool { + if spec.UnmanagedReplicas { + return true + } + return spec.Autoscaler.IsHPAEnabled() +} diff --git a/api/flowcollector/v1beta2/zz_generated.deepcopy.go b/api/flowcollector/v1beta2/zz_generated.deepcopy.go index 612a9c178..cbc20a6cf 100644 --- a/api/flowcollector/v1beta2/zz_generated.deepcopy.go +++ b/api/flowcollector/v1beta2/zz_generated.deepcopy.go @@ -692,6 +692,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) { **out = **in } in.KafkaConsumerAutoscaler.DeepCopyInto(&out.KafkaConsumerAutoscaler) + if in.ConsumerReplicas != nil { + in, out := &in.ConsumerReplicas, &out.ConsumerReplicas + *out = new(int32) + **out = **in + } if in.LogTypes != nil { in, out := &in.LogTypes, &out.LogTypes *out = new(FLPLogTypes) diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index e7055279d..e6f2070a8 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -2749,8 +2749,9 @@ spec: type: object type: object autoscaler: - description: '`autoscaler` spec of a horizontal pod autoscaler - to set up for the plugin Deployment.' + description: |- + `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -3210,16 +3211,23 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will + not reconcile `replicas`. This is useful when using a pod autoscaler. + type: boolean type: object deploymentModel: default: Direct description: |- `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:
- - `Direct` (default) to make the flow processor listen directly from the agents.
+ - `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.
+ - `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.
- `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.
- Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka). + Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+ `Direct` is not recommended on large clusters as it is less memory efficient. enum: - Direct + - Service - Kafka type: string exporters: @@ -5367,6 +5375,13 @@ spec: in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.' type: string + consumerReplicas: + description: |- + `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`. + This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`. + format: int32 + minimum: 0 + type: integer deduper: description: '`deduper` allows you to sample or drop flows identified as duplicates, in order to save on resource usage.' @@ -5435,8 +5450,9 @@ spec: type: string kafkaConsumerAutoscaler: description: |- - `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -5750,8 +5766,9 @@ spec: kafkaConsumerReplicas: default: 3 description: |- - `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: use `spec.processor.consumerReplicas` instead. format: int32 minimum: 0 type: integer @@ -6126,6 +6143,11 @@ spec: external traffic: flows that are not labeled for those subnets are external to the cluster. Enabled by default on OpenShift. type: boolean type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will + not reconcile `consumerReplicas`. This is useful when using + a pod autoscaler. + type: boolean type: object prometheus: description: '`prometheus` defines Prometheus settings, such as querier diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index 6a3c914de..586e4ad0b 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -568,6 +568,8 @@ spec: path: consolePlugin.portNaming.enable - displayName: Port names path: consolePlugin.portNaming.portNames + - displayName: Unmanaged replicas + path: consolePlugin.unmanagedReplicas - displayName: Address path: kafka.address - displayName: Topic @@ -606,6 +608,8 @@ spec: path: networkPolicy.additionalNamespaces - displayName: Enable path: networkPolicy.enable + - displayName: Consumer replicas + path: processor.consumerReplicas - displayName: Deduper path: processor.deduper - displayName: Mode @@ -628,6 +632,8 @@ spec: path: processor.subnetLabels - displayName: Custom labels path: processor.subnetLabels.customLabels + - displayName: Unmanaged replicas + path: processor.unmanagedReplicas - displayName: Prometheus path: prometheus - displayName: Querier @@ -726,7 +732,7 @@ spec: - Quick filters (`spec.consolePlugin.quickFilters`): configure preset filters to be displayed in the Console plugin. They offer a way to quickly switch from filters to others, such as showing / hiding pods network, or infrastructure network, or application network, etc. They can be tuned to reflect the different workloads running on your cluster. For a list of available filters, [check this page](https://github.com/netobserv/network-observability-operator/blob/1.9.2-community/docs/QuickFilters.md). - - Kafka (`spec.deploymentModel: KAFKA` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. + - Kafka (`spec.deploymentModel: Kafka` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. - Exporters (`spec.exporters`) an optional list of exporters to which to send enriched flows. KAFKA and IPFIX exporters are supported. This allows you to define any custom storage or processing that can read from Kafka or use the IPFIX standard. diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index f2417e2ee..c585fa6c8 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -2555,7 +2555,9 @@ spec: type: object type: object autoscaler: - description: '`autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment.' + description: |- + `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -3004,16 +3006,22 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will not reconcile `replicas`. This is useful when using a pod autoscaler. + type: boolean type: object deploymentModel: default: Direct description: |- `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:
- - `Direct` (default) to make the flow processor listen directly from the agents.
+ - `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.
+ - `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.
- `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.
- Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka). + Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+ `Direct` is not recommended on large clusters as it is less memory efficient. enum: - Direct + - Service - Kafka type: string exporters: @@ -4946,6 +4954,13 @@ spec: default: "" description: '`clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.' type: string + consumerReplicas: + description: |- + `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`. + This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`. + format: int32 + minimum: 0 + type: integer deduper: description: '`deduper` allows you to sample or drop flows identified as duplicates, in order to save on resource usage.' properties: @@ -5004,8 +5019,9 @@ spec: type: string kafkaConsumerAutoscaler: description: |- - `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -5312,8 +5328,9 @@ spec: kafkaConsumerReplicas: default: 3 description: |- - `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: use `spec.processor.consumerReplicas` instead. format: int32 minimum: 0 type: integer @@ -5661,6 +5678,9 @@ spec: external traffic: flows that are not labeled for those subnets are external to the cluster. Enabled by default on OpenShift. type: boolean type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will not reconcile `consumerReplicas`. This is useful when using a pod autoscaler. + type: boolean type: object prometheus: description: '`prometheus` defines Prometheus settings, such as querier configuration used to fetch metrics from the Console plugin.' diff --git a/config/descriptions/ocp.md b/config/descriptions/ocp.md index 20b9b4c30..fa1ecb322 100644 --- a/config/descriptions/ocp.md +++ b/config/descriptions/ocp.md @@ -56,7 +56,7 @@ A couple of settings deserve special attention: - Quick filters (`spec.consolePlugin.quickFilters`): configure preset filters to be displayed in the Console plugin. They offer a way to quickly switch from filters to others, such as showing / hiding pods network, or infrastructure network, or application network, etc. They can be tuned to reflect the different workloads running on your cluster. For a list of available filters, [check this page](https://github.com/netobserv/network-observability-operator/blob/1.9.2-community/docs/QuickFilters.md). -- Kafka (`spec.deploymentModel: KAFKA` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. +- Kafka (`spec.deploymentModel: Kafka` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. - Exporters (`spec.exporters`) an optional list of exporters to which to send enriched flows. KAFKA and IPFIX exporters are supported. This allows you to define any custom storage or processing that can read from Kafka or use the IPFIX standard. diff --git a/config/descriptions/upstream.md b/config/descriptions/upstream.md index 27162f6ae..c51964f6e 100644 --- a/config/descriptions/upstream.md +++ b/config/descriptions/upstream.md @@ -60,7 +60,7 @@ A couple of settings deserve special attention: - Quick filters (`spec.consolePlugin.quickFilters`): configure preset filters to be displayed in the Console plugin. They offer a way to quickly switch from filters to others, such as showing / hiding pods network, or infrastructure network, or application network, etc. They can be tuned to reflect the different workloads running on your cluster. For a list of available filters, [check this page](https://github.com/netobserv/network-observability-operator/blob/1.9.2-community/docs/QuickFilters.md). -- Kafka (`spec.deploymentModel: KAFKA` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. +- Kafka (`spec.deploymentModel: Kafka` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created. - Exporters (`spec.exporters`) an optional list of exporters to which to send enriched flows. KAFKA and IPFIX exporters are supported. This allows you to define any custom storage or processing that can read from Kafka or use the IPFIX standard. diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 9109e1a5c..723ad6096 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -112,11 +112,13 @@ for these features as a best effort only. enum `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:
-- `Direct` (default) to make the flow processor listen directly from the agents.
+- `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.
+- `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.
- `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.
-Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+`Direct` is not recommended on large clusters as it is less memory efficient.

- Enum: Direct, Kafka
+ Enum: Direct, Service, Kafka
Default: Direct
false @@ -3006,7 +3008,8 @@ such as `GOGC` and `GOMAXPROCS` environment variables. Set these values at your autoscaler object - `autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment.
+ `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. +Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`.
false @@ -3077,6 +3080,13 @@ For more information, see https://kubernetes.io/docs/concepts/configuration/mana Default: map[limits:map[memory:100Mi] requests:map[cpu:100m memory:50Mi]]
false + + unmanagedReplicas + boolean + + If `unmanagedReplicas` is `true`, the operator will not reconcile `replicas`. This is useful when using a pod autoscaler.
+ + false @@ -4897,7 +4907,8 @@ If the operator is Exists, the value should be empty, otherwise just a regular s -`autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment. +`autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. +Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`. @@ -8401,6 +8412,17 @@ such as `GOGC` and `GOMAXPROCS` environment variables. Set these values at your Default:
+ + + + + @@ -8431,8 +8453,9 @@ but with a lesser improvement in performance.
@@ -8457,8 +8480,9 @@ This setting is ignored when Kafka is disabled.
+ + + + +
false
consumerReplicasinteger + `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`. +This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`.
+
+ Format: int32
+ Minimum: 0
+
false
deduper object kafkaConsumerAutoscaler object - `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. -This setting is ignored when Kafka is disabled.
+ `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. +This setting is ignored when Kafka is disabled. +Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`.
false
kafkaConsumerReplicas integer - `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. -This setting is ignored when Kafka is disabled.
+ `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. +This setting is ignored when Kafka is disabled. +Deprecation notice: use `spec.processor.consumerReplicas` instead.

Format: int32
Default: 3
@@ -8523,6 +8547,13 @@ For more information, see https://kubernetes.io/docs/concepts/configuration/mana When a subnet matches the source or destination IP of a flow, a corresponding field is added: `SrcSubnetLabel` or `DstSubnetLabel`.
false
unmanagedReplicasboolean + If `unmanagedReplicas` is `true`, the operator will not reconcile `consumerReplicas`. This is useful when using a pod autoscaler.
+
false
@@ -10528,8 +10559,9 @@ Fields absent from the 'k8s.v1.cni.cncf.io/network-status' annotation must not b -`kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. +`kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. +Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`. diff --git a/helm/README.md b/helm/README.md index 30877f93b..950b50e84 100644 --- a/helm/README.md +++ b/helm/README.md @@ -65,6 +65,8 @@ metadata: name: cluster spec: namespace: netobserv + networkPolicy: + enable: false consolePlugin: advanced: env: @@ -84,6 +86,7 @@ A few remarks: - While the [web console](https://github.com/netobserv/network-observability-console-plugin) is primarily designed as a plugin for the OpenShift Console, it is still possible to deploy it as a standalone, which the dev team sometimes use for testing. This is why it is mentioned as "TEST_CONSOLE" here. - If you're in OpenShift, you should omit "TEST_CONSOLE: true" to use the Console plugin instead, which offers a better / more integrated experience. - You can change the Prometheus and Loki URLs depending on your installation. This example works if you use the "standalone" installation described above, with `install.loki=true` and `install.prom=true`. Check more configuration options for [Prometheus](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecprometheus-1) and [Loki](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecloki-1). +- You can enable networkPolicy, which makes the operator lock down the namespaces that it manages; however, this is highly dependent on your cluster topology, and may cause malfunctions, such as preventing NetObserv pods from communicating with the Kube API server. To view the test console, you can port-forward 9001: diff --git a/helm/crds/flows.netobserv.io_flowcollectors.yaml b/helm/crds/flows.netobserv.io_flowcollectors.yaml index 738a3e9ae..ad7819db1 100644 --- a/helm/crds/flows.netobserv.io_flowcollectors.yaml +++ b/helm/crds/flows.netobserv.io_flowcollectors.yaml @@ -2559,7 +2559,9 @@ spec: type: object type: object autoscaler: - description: '`autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment.' + description: |- + `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -3008,16 +3010,22 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will not reconcile `replicas`. This is useful when using a pod autoscaler. + type: boolean type: object deploymentModel: default: Direct description: |- `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:
- - `Direct` (default) to make the flow processor listen directly from the agents.
+ - `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.
+ - `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.
- `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.
- Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka). + Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
+ `Direct` is not recommended on large clusters as it is less memory efficient. enum: - Direct + - Service - Kafka type: string exporters: @@ -4950,6 +4958,13 @@ spec: default: "" description: '`clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.' type: string + consumerReplicas: + description: |- + `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`. + This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`. + format: int32 + minimum: 0 + type: integer deduper: description: '`deduper` allows you to sample or drop flows identified as duplicates, in order to save on resource usage.' properties: @@ -5008,8 +5023,9 @@ spec: type: string kafkaConsumerAutoscaler: description: |- - `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`. properties: maxReplicas: default: 3 @@ -5316,8 +5332,9 @@ spec: kafkaConsumerReplicas: default: 3 description: |- - `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. + `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages. This setting is ignored when Kafka is disabled. + Deprecation notice: use `spec.processor.consumerReplicas` instead. format: int32 minimum: 0 type: integer @@ -5665,6 +5682,9 @@ spec: external traffic: flows that are not labeled for those subnets are external to the cluster. Enabled by default on OpenShift. type: boolean type: object + unmanagedReplicas: + description: If `unmanagedReplicas` is `true`, the operator will not reconcile `consumerReplicas`. This is useful when using a pod autoscaler. + type: boolean type: object prometheus: description: '`prometheus` defines Prometheus settings, such as querier configuration used to fetch metrics from the Console plugin.' diff --git a/internal/controller/consoleplugin/consoleplugin_reconciler.go b/internal/controller/consoleplugin/consoleplugin_reconciler.go index eaab04b52..87698b80f 100644 --- a/internal/controller/consoleplugin/consoleplugin_reconciler.go +++ b/internal/controller/consoleplugin/consoleplugin_reconciler.go @@ -206,8 +206,7 @@ func (r *CPReconciler) reconcileDeployment(ctx context.Context, builder *builder r.deployment, builder.deployment(name, cmDigest), name, - helper.PtrInt32(desired.ConsolePlugin.Replicas), - &desired.ConsolePlugin.Autoscaler, + desired.ConsolePlugin.UnmanagedReplicas, &report, ) } diff --git a/internal/controller/constants/constants.go b/internal/controller/constants/constants.go index 8dfa31b3e..09d0a2a58 100644 --- a/internal/controller/constants/constants.go +++ b/internal/controller/constants/constants.go @@ -50,11 +50,14 @@ const ( ClusterNameLabelName = "K8S_ClusterName" - MonitoringNamespace = "openshift-monitoring" - MonitoringServiceAccount = "prometheus-k8s" - UWMonitoringNamespace = "openshift-user-workload-monitoring" - ConsoleNamespace = "openshift-console" - DNSNamespace = "openshift-dns" + KubeSystemNamespace = "kube-system" + OpenShiftAPIServerNamespace = "openshift-apiserver" + OpenShiftKubeAPIServerNamespace = "openshift-kube-apiserver" + MonitoringNamespace = "openshift-monitoring" + MonitoringServiceAccount = "prometheus-k8s" + UWMonitoringNamespace = "openshift-user-workload-monitoring" + ConsoleNamespace = "openshift-console" + DNSNamespace = "openshift-dns" // [Cluster]Roles, must match names in config/rbac/component_roles.yaml (without netobserv- prefix) LokiWriterRole ClusterRoleName = "netobserv-loki-writer" diff --git a/internal/controller/ebpf/agent_controller.go b/internal/controller/ebpf/agent_controller.go index f904ad23c..ab5fb52ef 100644 --- a/internal/controller/ebpf/agent_controller.go +++ b/internal/controller/ebpf/agent_controller.go @@ -386,7 +386,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol Spec: corev1.PodSpec{ // Allows deploying an instance in the master node ServiceAccountName: constants.EBPFServiceAccount, - HostNetwork: true, + HostNetwork: true, // HostNetwork needed for TC programs, regardless of the connection with FLP DNSPolicy: corev1.DNSClusterFirstWithHostNet, Volumes: volumes, Containers: []corev1.Container{{ @@ -465,20 +465,31 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC } else { config = append(config, corev1.EnvVar{Name: envExport, Value: exportGRPC}) advancedConfig := helper.GetAdvancedProcessorConfig(&coll.Spec) - // When flowlogs-pipeline is deployed as a daemonset, each agent must send - // data to the pod that is deployed in the same host - config = append(config, corev1.EnvVar{ - Name: envFlowsTargetHost, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "status.hostIP", + if coll.Spec.UseHostNetwork() { + // When flowlogs-pipeline is deployed as a daemonset, each agent must send + // data to the pod that is deployed in the same host + config = append(config, corev1.EnvVar{ + Name: envFlowsTargetHost, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.hostIP", + }, }, - }, - }, corev1.EnvVar{ - Name: envFlowsTargetPort, - Value: strconv.Itoa(int(*advancedConfig.Port)), - }) + }, corev1.EnvVar{ + Name: envFlowsTargetPort, + Value: strconv.Itoa(int(*advancedConfig.Port)), + }) + } else { + // Send to FLP service + config = append(config, corev1.EnvVar{ + Name: envFlowsTargetHost, + Value: fmt.Sprintf("%s.%s.svc", constants.FLPName, c.Namespace), + }, corev1.EnvVar{ + Name: envFlowsTargetPort, + Value: strconv.Itoa(int(*advancedConfig.Port)), + }) + } } return config, nil diff --git a/internal/controller/flowcollector_controller_iso_test.go b/internal/controller/flowcollector_controller_iso_test.go index 698b9ce73..0c4f2b5a8 100644 --- a/internal/controller/flowcollector_controller_iso_test.go +++ b/internal/controller/flowcollector_controller_iso_test.go @@ -62,7 +62,8 @@ func flowCollectorIsoSpecs() { ImagePullPolicy: "Always", LogLevel: "trace", Resources: v1.ResourceRequirements{Limits: nil, Requests: nil}, - KafkaConsumerReplicas: &zero, + KafkaConsumerReplicas: ptr.To(int32(3)), + ConsumerReplicas: ptr.To(int32(3)), KafkaConsumerAutoscaler: flowslatest.FlowCollectorHPA{Status: "Disabled", MinReplicas: &zero, MaxReplicas: zero, Metrics: []ascv2.MetricSpec{}}, KafkaConsumerQueueCapacity: int(zero), KafkaConsumerBatchSize: int(zero), diff --git a/internal/controller/flp/flp_common_objects.go b/internal/controller/flp/flp_common_objects.go index 5834b4a29..417ff18a5 100644 --- a/internal/controller/flp/flp_common_objects.go +++ b/internal/controller/flp/flp_common_objects.go @@ -73,24 +73,41 @@ func getPromTLS(desired *flowslatest.FlowCollectorSpec, serviceName string) (*fl return promTLS, nil } +type flowNetworkType int + +const ( + hostNetwork flowNetworkType = iota + hostPort + svc + pull +) + func podTemplate( appName, version, imageName, cmName string, desired *flowslatest.FlowCollectorSpec, vols *volumes.Builder, - hasHostPort, hostNetwork bool, + netType flowNetworkType, annotations map[string]string, ) corev1.PodTemplateSpec { advancedConfig := helper.GetAdvancedProcessorConfig(desired) var ports []corev1.ContainerPort - if hasHostPort { + switch netType { + case hostNetwork, hostPort: ports = []corev1.ContainerPort{{ Name: constants.FLPPortName, HostPort: *advancedConfig.Port, ContainerPort: *advancedConfig.Port, Protocol: corev1.ProtocolTCP, }} + case svc: + ports = []corev1.ContainerPort{{ + Name: constants.FLPPortName, + ContainerPort: *advancedConfig.Port, + Protocol: corev1.ProtocolTCP, + }} + case pull: + // does not listen for flows => no port } - ports = append(ports, corev1.ContainerPort{ Name: healthPortName, ContainerPort: *advancedConfig.HealthPort, @@ -166,7 +183,7 @@ func podTemplate( } } dnsPolicy := corev1.DNSClusterFirst - if hostNetwork { + if netType == hostNetwork { dnsPolicy = corev1.DNSClusterFirstWithHostNet } annotations["prometheus.io/scrape"] = "true" @@ -184,7 +201,7 @@ func podTemplate( Volumes: volumes, Containers: []corev1.Container{container}, ServiceAccountName: appName, - HostNetwork: hostNetwork, + HostNetwork: netType == hostNetwork, DNSPolicy: dnsPolicy, NodeSelector: advancedConfig.Scheduling.NodeSelector, Tolerations: advancedConfig.Scheduling.Tolerations, diff --git a/internal/controller/flp/flp_monolith_objects.go b/internal/controller/flp/flp_monolith_objects.go index 3c16c5e84..d9926fa94 100644 --- a/internal/controller/flp/flp_monolith_objects.go +++ b/internal/controller/flp/flp_monolith_objects.go @@ -1,10 +1,6 @@ package flp import ( - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2" metricslatest "github.com/netobserv/network-observability-operator/api/flowmetrics/v1alpha1" "github.com/netobserv/network-observability-operator/internal/controller/constants" @@ -12,6 +8,10 @@ import ( "github.com/netobserv/network-observability-operator/internal/pkg/helper" "github.com/netobserv/network-observability-operator/internal/pkg/volumes" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) const ( @@ -50,6 +50,10 @@ func newMonolithBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCol } func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.DaemonSet { + netType := hostNetwork + if b.info.ClusterInfo.IsOpenShift() { + netType = hostPort + } pod := podTemplate( monoName, b.version, @@ -57,8 +61,7 @@ func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.Daemo monoConfigMap, b.desired, &b.volumes, - true, /*listens*/ - !b.info.ClusterInfo.IsOpenShift(), + netType, annotations, ) return &appsv1.DaemonSet{ @@ -80,6 +83,38 @@ func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.Daemo } } +func (b *monolithBuilder) deployment(annotations map[string]string) *appsv1.Deployment { + pod := podTemplate( + monoName, + b.version, + b.info.Images[reconcilers.MainImage], + monoConfigMap, + b.desired, + &b.volumes, + svc, + annotations, + ) + replicas := b.desired.Processor.GetFLPReplicas() + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: monoName, + Namespace: b.info.Namespace, + Labels: map[string]string{ + "part-of": constants.OperatorName, + "app": monoName, + "version": b.version, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": monoName}, + }, + Template: pod, + }, + } +} + func (b *monolithBuilder) configMaps() (*corev1.ConfigMap, string, *corev1.ConfigMap, error) { kafkaStage := newGRPCPipeline(b.desired) pipeline := newPipelineBuilder( @@ -119,6 +154,32 @@ func (b *monolithBuilder) configMaps() (*corev1.ConfigMap, string, *corev1.Confi return staticCM, digest, dynamicCM, err } +func (b *monolithBuilder) service() *corev1.Service { + advancedConfig := helper.GetAdvancedProcessorConfig(b.desired) + port := *advancedConfig.Port + svc := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: monoName, + Namespace: b.info.Namespace, + Labels: map[string]string{ + "part-of": constants.OperatorName, + "app": monoName, + "version": b.version, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": monoName}, + Ports: []corev1.ServicePort{{ + Name: constants.FLPPortName, + Port: port, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(port), + }}, + }, + } + return &svc +} + func (b *monolithBuilder) promService() *corev1.Service { return promService( b.desired, diff --git a/internal/controller/flp/flp_monolith_reconciler.go b/internal/controller/flp/flp_monolith_reconciler.go index 3a6be8ce0..843872e1a 100644 --- a/internal/controller/flp/flp_monolith_reconciler.go +++ b/internal/controller/flp/flp_monolith_reconciler.go @@ -23,6 +23,8 @@ import ( type monolithReconciler struct { *reconcilers.Instance daemonSet *appsv1.DaemonSet + deployment *appsv1.Deployment + service *corev1.Service promService *corev1.Service serviceAccount *corev1.ServiceAccount staticConfigMap *corev1.ConfigMap @@ -39,6 +41,8 @@ func newMonolithReconciler(cmn *reconcilers.Instance) *monolithReconciler { rec := monolithReconciler{ Instance: cmn, daemonSet: cmn.Managed.NewDaemonSet(monoName), + deployment: cmn.Managed.NewDeployment(monoName), + service: cmn.Managed.NewService(monoName), promService: cmn.Managed.NewService(constants.FLPMetricsSvcName), serviceAccount: cmn.Managed.NewServiceAccount(monoName), staticConfigMap: cmn.Managed.NewConfigMap(monoConfigMap), @@ -110,6 +114,14 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest return err } + if desired.Spec.UseHostNetwork() { + r.Managed.TryDelete(ctx, r.service) + } else { + if err := r.reconcileService(ctx, &builder); err != nil { + return err + } + } + err = r.reconcilePrometheusService(ctx, &builder) if err != nil { return err @@ -133,7 +145,15 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest return err } - return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations)) + if desired.Spec.UseHostNetwork() { + // Use DaemonSet + r.Managed.TryDelete(ctx, r.deployment) + return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations)) + } + + // Use Deployment + r.Managed.TryDelete(ctx, r.daemonSet) + return r.reconcileDeployment(ctx, &desired.Spec.Processor, &builder, annotations) } func (r *monolithReconciler) reconcileDynamicConfigMap(ctx context.Context, newDCM *corev1.ConfigMap) error { @@ -149,6 +169,16 @@ func (r *monolithReconciler) reconcileDynamicConfigMap(ctx context.Context, newD return nil } +func (r *monolithReconciler) reconcileService(ctx context.Context, builder *monolithBuilder) error { + report := helper.NewChangeReport("FLP service") + defer report.LogIfNeeded(ctx) + + if err := r.ReconcileService(ctx, r.service, builder.service(), &report); err != nil { + return err + } + return nil +} + func (r *monolithReconciler) reconcilePrometheusService(ctx context.Context, builder *monolithBuilder) error { report := helper.NewChangeReport("FLP prometheus service") defer report.LogIfNeeded(ctx) @@ -186,6 +216,21 @@ func (r *monolithReconciler) reconcileDaemonSet(ctx context.Context, desiredDS * ) } +func (r *monolithReconciler) reconcileDeployment(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *monolithBuilder, annotations map[string]string) error { + report := helper.NewChangeReport("FLP Deployment") + defer report.LogIfNeeded(ctx) + + return reconcilers.ReconcileDeployment( + ctx, + r.Instance, + r.deployment, + builder.deployment(annotations), + constants.FLPName, + desiredFLP.IsUnmanagedFLPReplicas(), + &report, + ) +} + func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder *monolithBuilder) error { if !r.Managed.Exists(r.serviceAccount) { return r.CreateOwned(ctx, builder.serviceAccount()) @@ -198,7 +243,7 @@ func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder * } // Host network - if r.ClusterInfo.IsOpenShift() { + if r.ClusterInfo.IsOpenShift() && builder.desired.UseHostNetwork() { r.rbHostNetwork = resources.GetClusterRoleBinding(r.Namespace, monoShortName, monoName, monoName, constants.HostNetworkRole) if err := r.ReconcileClusterRoleBinding(ctx, r.rbHostNetwork); err != nil { return err diff --git a/internal/controller/flp/flp_test.go b/internal/controller/flp/flp_test.go index ff9e2acc4..cf443fdbe 100644 --- a/internal/controller/flp/flp_test.go +++ b/internal/controller/flp/flp_test.go @@ -1,19 +1,3 @@ -/* -Copyright 2021. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package flp import ( @@ -73,7 +57,7 @@ func getConfig() flowslatest.FlowCollectorSpec { }, }, }, - KafkaConsumerReplicas: ptr.To(int32(1)), + ConsumerReplicas: ptr.To(int32(1)), KafkaConsumerAutoscaler: flowslatest.FlowCollectorHPA{ Status: flowslatest.HPAStatusEnabled, MinReplicas: &minReplicas, @@ -356,7 +340,7 @@ func TestDeploymentNoChange(t *testing.T) { second := b.deployment(annotate(digest)) report := helper.NewChangeReport("") - assert.False(helper.DeploymentChanged(first, second, constants.FLPName, !cfg.Processor.KafkaConsumerAutoscaler.HPAEnabled(), *cfg.Processor.KafkaConsumerReplicas, &report)) + assert.False(helper.DeploymentChanged(first, second, constants.FLPName, &report)) assert.Contains(report.String(), "no change") } @@ -380,7 +364,10 @@ func TestDeploymentChanged(t *testing.T) { report := helper.NewChangeReport("") checkChanged := func(old, newd *appsv1.Deployment, spec flowslatest.FlowCollectorSpec) bool { - return helper.DeploymentChanged(old, newd, constants.FLPName, !cfg.Processor.KafkaConsumerAutoscaler.HPAEnabled(), *spec.Processor.KafkaConsumerReplicas, &report) + if spec.Processor.IsUnmanagedFLPReplicas() { + newd.Spec.Replicas = old.Spec.Replicas + } + return helper.DeploymentChanged(old, newd, constants.FLPName, &report) } assert.True(checkChanged(first, second, cfg)) @@ -430,7 +417,7 @@ func TestDeploymentChanged(t *testing.T) { // Check replicas didn't change because HPA is used cfg2 := cfg - cfg2.Processor.KafkaConsumerReplicas = ptr.To(int32(5)) + cfg2.Processor.ConsumerReplicas = ptr.To(int32(5)) b = transfBuilder(ns, &cfg2) _, digest, _, err = b.configMaps() assert.NoError(err) @@ -454,14 +441,14 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { // Check replicas changed (need to copy flp, as Spec.Replicas stores a pointer) cfg2 := cfg - cfg2.Processor.KafkaConsumerReplicas = ptr.To(int32(5)) + cfg2.Processor.ConsumerReplicas = ptr.To(int32(5)) b = transfBuilder(ns, &cfg2) _, digest, _, err = b.configMaps() assert.NoError(err) second := b.deployment(annotate(digest)) report := helper.NewChangeReport("") - assert.True(helper.DeploymentChanged(first, second, constants.FLPName, !cfg2.Processor.KafkaConsumerAutoscaler.HPAEnabled(), *cfg2.Processor.KafkaConsumerReplicas, &report)) + assert.True(helper.DeploymentChanged(first, second, constants.FLPName, &report)) assert.Contains(report.String(), "Replicas changed") } diff --git a/internal/controller/flp/flp_transfo_objects.go b/internal/controller/flp/flp_transfo_objects.go index cc6f24883..49c5fa87e 100644 --- a/internal/controller/flp/flp_transfo_objects.go +++ b/internal/controller/flp/flp_transfo_objects.go @@ -58,10 +58,10 @@ func (b *transfoBuilder) deployment(annotations map[string]string) *appsv1.Deplo transfoConfigMap, b.desired, &b.volumes, - false, /*no listen*/ - false, /*no host network*/ + pull, annotations, ) + replicas := b.desired.Processor.GetFLPReplicas() return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: transfoName, @@ -73,7 +73,7 @@ func (b *transfoBuilder) deployment(annotations map[string]string) *appsv1.Deplo }, }, Spec: appsv1.DeploymentSpec{ - Replicas: b.desired.Processor.KafkaConsumerReplicas, + Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"app": transfoName}, }, diff --git a/internal/controller/flp/flp_transfo_reconciler.go b/internal/controller/flp/flp_transfo_reconciler.go index 4c7246e46..0b13ebb5c 100644 --- a/internal/controller/flp/flp_transfo_reconciler.go +++ b/internal/controller/flp/flp_transfo_reconciler.go @@ -167,8 +167,7 @@ func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desired r.deployment, builder.deployment(annotations), constants.FLPName, - helper.PtrInt32(desiredFLP.KafkaConsumerReplicas), - &desiredFLP.KafkaConsumerAutoscaler, + desiredFLP.IsUnmanagedFLPReplicas(), &report, ) } diff --git a/internal/controller/networkpolicy/np_objects.go b/internal/controller/networkpolicy/np_objects.go index 93abbb4ab..930ee5621 100644 --- a/internal/controller/networkpolicy/np_objects.go +++ b/internal/controller/networkpolicy/np_objects.go @@ -23,6 +23,32 @@ func peerInNamespace(ns string) networkingv1.NetworkPolicyPeer { } } +func peerInNamespaces(ns []string) networkingv1.NetworkPolicyPeer { + return networkingv1.NetworkPolicyPeer{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: ns, + }}, + }, + PodSelector: &metav1.LabelSelector{}, // see https://issues.redhat.com/browse/OSDOCS-14395 / needed for apiserver + } +} + +func addAllowedNamespaces(np *networkingv1.NetworkPolicy, in, out []string) { + if len(in) > 0 { + np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ + From: []networkingv1.NetworkPolicyPeer{peerInNamespaces(in)}, + }) + } + if len(out) > 0 { + np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ + To: []networkingv1.NetworkPolicyPeer{peerInNamespaces(out)}, + }) + } +} + func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Manager) (types.NamespacedName, *networkingv1.NetworkPolicy) { ns := desired.Spec.GetNamespace() @@ -31,8 +57,6 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man return name, nil } - privNs := ns + constants.EBPFPrivilegedNSSuffix - np := networkingv1.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{ Name: netpolName, @@ -61,19 +85,26 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man }, }, } - // Allow traffic from the eBPF agents - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(privNs), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(privNs), - }, - }) + allowedNamespacesIn := []string{} + allowedNamespacesOut := []string{} + + if desired.Spec.UseLoki() && + desired.Spec.Loki.Mode == flowslatest.LokiModeLokiStack && + desired.Spec.Loki.LokiStack.Namespace != "" && + desired.Spec.Loki.LokiStack.Namespace != ns { + allowedNamespacesIn = append(allowedNamespacesIn, desired.Spec.Loki.LokiStack.Namespace) + allowedNamespacesOut = append(allowedNamespacesOut, desired.Spec.Loki.LokiStack.Namespace) + } if mgr.ClusterInfo.IsOpenShift() { + allowedNamespacesOut = append(allowedNamespacesOut, constants.DNSNamespace) + allowedNamespacesOut = append(allowedNamespacesOut, constants.MonitoringNamespace) + if mgr.Config.DownstreamDeployment { + allowedNamespacesIn = append(allowedNamespacesIn, constants.MonitoringNamespace) + } else { + allowedNamespacesIn = append(allowedNamespacesIn, constants.UWMonitoringNamespace) + } + if desired.Spec.UseConsolePlugin() && mgr.ClusterInfo.HasConsolePlugin() { advanced := helper.GetAdvancedPluginConfig(desired.Spec.ConsolePlugin.Advanced) np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ @@ -85,41 +116,20 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man Port: ptr.To(intstr.FromInt32(*advanced.Port)), }}, }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.ConsoleNamespace), - }, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: ptr.To(corev1.ProtocolTCP), - Port: ptr.To(intstr.FromInt32(*advanced.Port)), - }}, - }) } - if mgr.Config.DownstreamDeployment { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.MonitoringNamespace), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.MonitoringNamespace), - }, - }) - - } else { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.UWMonitoringNamespace), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.UWMonitoringNamespace), - }, + // Allow apiserver/host + hostNetworkPorts := []networkingv1.NetworkPolicyPort{{ + Protocol: ptr.To(corev1.ProtocolTCP), + Port: ptr.To(intstr.FromInt32(constants.WebhookPort)), + }} + if desired.Spec.DeploymentModel == flowslatest.DeploymentModelService { + // Can be counter-intuitive, but only the DeploymentModelService mode needs an explicit rule for host-network (agents are still hostnetwork pods) + advanced := helper.GetAdvancedProcessorConfig(&desired.Spec) + hostNetworkPorts = append(hostNetworkPorts, networkingv1.NetworkPolicyPort{ + Protocol: ptr.To(corev1.ProtocolTCP), + Port: ptr.To(intstr.FromInt32(*advanced.Port)), }) } - // Allow apiserver/host np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ From: []networkingv1.NetworkPolicyPeer{ { @@ -128,60 +138,28 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man }}, }, }, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: ptr.To(corev1.ProtocolTCP), - Port: ptr.To(intstr.FromInt32(constants.WebhookPort)), - }}, + Ports: hostNetworkPorts, }) - // Allow host + // Allow fetching from apiserver np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{}, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: ptr.To(corev1.ProtocolTCP), - Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)), - }}, - }) - // Allow apiserver - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - { - NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{ - "policy-group.network.openshift.io/host-network": "", - }}, - }, + To: []networkingv1.NetworkPolicyPeer{ + peerInNamespaces([]string{constants.OpenShiftAPIServerNamespace, constants.OpenShiftKubeAPIServerNamespace}), }, Ports: []networkingv1.NetworkPolicyPort{{ Protocol: ptr.To(corev1.ProtocolTCP), - Port: ptr.To(intstr.FromInt32(constants.WebhookPort)), + Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)), }}, }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.DNSNamespace), - }, - }) - if desired.Spec.UseLoki() && desired.Spec.Loki.Mode == flowslatest.LokiModeLokiStack { - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(desired.Spec.Loki.LokiStack.Namespace), - }, - }) - } + } else { + // Not OpenShift + // Allow fetching from apiserver / kube-system + allowedNamespacesOut = append(allowedNamespacesOut, constants.KubeSystemNamespace) } - for _, aNs := range desired.Spec.NetworkPolicy.AdditionalNamespaces { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(aNs), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(aNs), - }, - }) + allowedNamespacesIn = append(allowedNamespacesIn, desired.Spec.NetworkPolicy.AdditionalNamespaces...) + allowedNamespacesOut = append(allowedNamespacesOut, desired.Spec.NetworkPolicy.AdditionalNamespaces...) - } + addAllowedNamespaces(&np, allowedNamespacesIn, allowedNamespacesOut) return name, &np } @@ -211,53 +189,18 @@ func buildPrivilegedNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manag }, } - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(mainNs), - }, - }) + // Note that we don't need explicit authorizations for egress as agent pods are on hostnetwork, which allows us to further lock the namespace + allowedNamespacesIn := []string{} if mgr.ClusterInfo.IsOpenShift() { if mgr.Config.DownstreamDeployment { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.MonitoringNamespace), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.MonitoringNamespace), - }, - }) - + allowedNamespacesIn = append(allowedNamespacesIn, constants.MonitoringNamespace) } else { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.UWMonitoringNamespace), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(constants.UWMonitoringNamespace), - }, - }) - + allowedNamespacesIn = append(allowedNamespacesIn, constants.UWMonitoringNamespace) } } - for _, aNs := range desired.Spec.NetworkPolicy.AdditionalNamespaces { - np.Spec.Ingress = append(np.Spec.Ingress, networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(aNs), - }, - }) - np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{ - peerInNamespace(aNs), - }, - }) - - } + addAllowedNamespaces(&np, allowedNamespacesIn, nil) return name, &np } diff --git a/internal/controller/networkpolicy/np_test.go b/internal/controller/networkpolicy/np_test.go index cb3175b34..6ab035172 100644 --- a/internal/controller/networkpolicy/np_test.go +++ b/internal/controller/networkpolicy/np_test.go @@ -46,8 +46,8 @@ func getConfig() flowslatest.FlowCollector { DeploymentModel: flowslatest.DeploymentModelDirect, Agent: flowslatest.FlowCollectorAgent{Type: flowslatest.AgentEBPF}, Processor: flowslatest.FlowCollectorFLP{ - LogLevel: "trace", - KafkaConsumerReplicas: ptr.To(int32(1)), + LogLevel: "trace", + ConsumerReplicas: ptr.To(int32(1)), KafkaConsumerAutoscaler: flowslatest.FlowCollectorHPA{ Status: flowslatest.HPAStatusEnabled, Metrics: []ascv2.MetricSpec{}, @@ -98,13 +98,6 @@ func TestNpBuilder(t *testing.T) { {From: []networkingv1.NetworkPolicyPeer{ {PodSelector: &metav1.LabelSelector{}}, }}, - {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "netobserv-privileged", - }, - }}, - }}, }, np.Spec.Ingress) assert.Equal([]networkingv1.NetworkPolicyEgressRule{ @@ -112,13 +105,15 @@ func TestNpBuilder(t *testing.T) { {PodSelector: &metav1.LabelSelector{}}, }}, {To: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "netobserv-privileged", - }, + {PodSelector: &metav1.LabelSelector{}, NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"kube-system"}, + }}, }}, }}, - }[1], np.Spec.Egress[1]) + }, np.Spec.Egress) name, np = buildPrivilegedNetworkPolicy(&desired, mgr) assert.NotNil(np) @@ -136,24 +131,12 @@ func TestNpBuilder(t *testing.T) { {PodSelector: &metav1.LabelSelector{}}, }}, {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "netobserv-privileged", - }, - }}, - }}, - {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "foo", - }, - }}, - }}, - {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "bar", - }, + {PodSelector: &metav1.LabelSelector{}, NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"foo", "bar"}, + }}, }}, }}, }, np.Spec.Ingress) @@ -163,24 +146,12 @@ func TestNpBuilder(t *testing.T) { {PodSelector: &metav1.LabelSelector{}}, }}, {To: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "netobserv-privileged", - }, - }}, - }}, - {To: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "foo", - }, - }}, - }}, - {To: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "bar", - }, + {PodSelector: &metav1.LabelSelector{}, NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"kube-system", "foo", "bar"}, + }}, }}, }}, }, np.Spec.Egress) @@ -189,19 +160,5 @@ func TestNpBuilder(t *testing.T) { assert.NotNil(np) assert.Equal(np.ObjectMeta.Name, name.Name) assert.Equal(np.ObjectMeta.Namespace, name.Namespace) - assert.Equal([]networkingv1.NetworkPolicyIngressRule{ - {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "foo", - }, - }}, - }}, - {From: []networkingv1.NetworkPolicyPeer{ - {NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "kubernetes.io/metadata.name": "bar", - }, - }}, - }}}, np.Spec.Ingress) + assert.Equal([]networkingv1.NetworkPolicyIngressRule{}, np.Spec.Ingress) } diff --git a/internal/controller/reconcilers/reconcilers.go b/internal/controller/reconcilers/reconcilers.go index 81f411342..061304101 100644 --- a/internal/controller/reconcilers/reconcilers.go +++ b/internal/controller/reconcilers/reconcilers.go @@ -134,13 +134,16 @@ func ReconcileDaemonSet(ctx context.Context, ci *Instance, old, n *appsv1.Daemon return nil } -func ReconcileDeployment(ctx context.Context, ci *Instance, old, n *appsv1.Deployment, containerName string, replicas int32, hpa *flowslatest.FlowCollectorHPA, report *helper.ChangeReport) error { +func ReconcileDeployment(ctx context.Context, ci *Instance, old, n *appsv1.Deployment, containerName string, ignoreReplicas bool, report *helper.ChangeReport) error { if !ci.Managed.Exists(old) { ci.Status.SetCreatingDeployment(n) return ci.CreateOwned(ctx, n) } ci.Status.CheckDeploymentProgress(old) - if helper.DeploymentChanged(old, n, containerName, !hpa.HPAEnabled(), replicas, report) { + if ignoreReplicas { + n.Spec.Replicas = old.Spec.Replicas + } + if helper.DeploymentChanged(old, n, containerName, report) { return ci.UpdateIfOwned(ctx, old, n) } return nil @@ -148,7 +151,7 @@ func ReconcileDeployment(ctx context.Context, ci *Instance, old, n *appsv1.Deplo func ReconcileHPA(ctx context.Context, ci *Instance, old, n *ascv2.HorizontalPodAutoscaler, desired *flowslatest.FlowCollectorHPA, report *helper.ChangeReport) error { // Delete or Create / Update Autoscaler according to HPA option - if desired.HPAEnabled() { + if desired.IsHPAEnabled() { if !ci.Managed.Exists(old) { return ci.CreateOwned(ctx, n) } else if helper.AutoScalerChanged(old, *desired, report) { diff --git a/internal/pkg/helper/comparators.go b/internal/pkg/helper/comparators.go index b59b02d39..f190c1495 100644 --- a/internal/pkg/helper/comparators.go +++ b/internal/pkg/helper/comparators.go @@ -52,9 +52,9 @@ func DaemonSetChanged(current, desired *appsv1.DaemonSet) ReconcileAction { return ActionNone } -func DeploymentChanged(old, n *appsv1.Deployment, contName string, checkReplicas bool, desiredReplicas int32, report *ChangeReport) bool { +func DeploymentChanged(old, n *appsv1.Deployment, contName string, report *ChangeReport) bool { return report.Check("Pod changed", PodChanged(&old.Spec.Template, &n.Spec.Template, contName, report)) || - report.Check("Replicas changed", (checkReplicas && *old.Spec.Replicas != desiredReplicas)) + report.Check("Replicas changed", *old.Spec.Replicas != *n.Spec.Replicas) } func PodChanged(old, n *corev1.PodTemplateSpec, containerName string, report *ChangeReport) bool { diff --git a/internal/pkg/helper/flowcollector.go b/internal/pkg/helper/flowcollector.go index 4ad76325a..1d443645c 100644 --- a/internal/pkg/helper/flowcollector.go +++ b/internal/pkg/helper/flowcollector.go @@ -14,30 +14,6 @@ const ( netobservManagedLabel = "netobserv-managed" ) -func GetSampling(spec *flowslatest.FlowCollectorSpec) int { - if spec.Agent.EBPF.Sampling == nil { - return 50 - } - return int(*spec.Agent.EBPF.Sampling) -} - -func UseKafka(spec *flowslatest.FlowCollectorSpec) bool { - return spec.DeploymentModel == flowslatest.DeploymentModelKafka -} - -func HasKafkaExporter(spec *flowslatest.FlowCollectorSpec) bool { - for _, ex := range spec.Exporters { - if ex.Type == flowslatest.KafkaExporter { - return true - } - } - return false -} - -func HPAEnabled(spec *flowslatest.FlowCollectorHPA) bool { - return spec != nil && spec.Status == flowslatest.HPAStatusEnabled -} - func GetRecordTypes(processor *flowslatest.FlowCollectorFLP) []api.ConnTrackOutputRecordTypeEnum { if processor.LogTypes != nil { switch *processor.LogTypes { @@ -148,7 +124,8 @@ func GetAdvancedAgentConfig(specConfig *flowslatest.AdvancedAgentConfig) flowsla func GetAdvancedProcessorConfig(spec *flowslatest.FlowCollectorSpec) flowslatest.AdvancedProcessorConfig { var defaultToleration []corev1.Toleration - if !UseKafka(spec) { // TODO: with coming-soon Service deploymentModel, this should be replaced with "UseHostNetwork" (conflict should pop up while rebasing). + if spec.UseHostNetwork() { + // When using host-network ie. 1-1 pairing with agents, FLP should have same toleration as agents defaultToleration = []corev1.Toleration{{Operator: corev1.TolerationOpExists}} } cfg := flowslatest.AdvancedProcessorConfig{