Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ metadata:
name: cluster
spec:
namespace: netobserv
networkPolicy:
enable: false
consolePlugin:
advanced:
env:
Expand All @@ -91,6 +93,7 @@ A few remarks:
- While the [web console](https://github.com/netobserv/network-observability-console-plugin) is primarily designed as a plugin for the OpenShift Console, it is still possible to deploy it as a standalone, which the dev team sometimes use for testing. This is why it is mentioned as "TEST_CONSOLE" here.
- If you're in OpenShift, you should omit "TEST_CONSOLE: true" to use the Console plugin instead, which offers a better / more integrated experience.
- You can change the Prometheus and Loki URLs depending on your installation. This example works if you use the "standalone" installation described above, with `install.loki=true` and `install.prom=true`. Check more configuration options for [Prometheus](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecprometheus-1) and [Loki](https://github.com/netobserv/network-observability-operator/blob/main/docs/FlowCollector.md#flowcollectorspecloki-1).
- You can enable networkPolicy, which makes the operator lock down the namespaces that it manages; however, this is highly dependent on your cluster topology, and may cause malfunctions, such as preventing NetObserv pods from communicating with the Kube API server.

To view the test console, you can port-forward 9001:

Expand Down
35 changes: 27 additions & 8 deletions api/flowcollector/v1beta2/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
type FlowCollectorDeploymentModel string

const (
DeploymentModelDirect FlowCollectorDeploymentModel = "Direct"
DeploymentModelKafka FlowCollectorDeploymentModel = "Kafka"
DeploymentModelDirect FlowCollectorDeploymentModel = "Direct"
DeploymentModelKafka FlowCollectorDeploymentModel = "Kafka"
DeploymentModelService FlowCollectorDeploymentModel = "Service"
)

// Please notice that the FlowCollectorSpec's properties MUST redefine one of the default
Expand Down Expand Up @@ -69,11 +70,13 @@ type FlowCollectorSpec struct {
ConsolePlugin FlowCollectorConsolePlugin `json:"consolePlugin,omitempty"`

// `deploymentModel` defines the desired type of deployment for flow processing. Possible values are:<br>
// - `Direct` (default) to make the flow processor listen directly from the agents.<br>
// - `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.<br>
// - `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.<br>
// - `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.<br>
// Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
// Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).<br>
// `Direct` is not recommended on large clusters as it is less memory efficient.
// +unionDiscriminator
// +kubebuilder:validation:Enum:="Direct";"Kafka"
// +kubebuilder:validation:Enum:="Direct";"Service";"Kafka"
// +kubebuilder:default:=Direct
DeploymentModel FlowCollectorDeploymentModel `json:"deploymentModel,omitempty"`

Expand Down Expand Up @@ -626,15 +629,26 @@ type FlowCollectorFLP struct {

//+kubebuilder:validation:Minimum=0
//+kubebuilder:default:=3
// `kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
// `kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
// This setting is ignored when Kafka is disabled.
// Deprecation notice: use `spec.processor.consumerReplicas` instead.
KafkaConsumerReplicas *int32 `json:"kafkaConsumerReplicas,omitempty"`

// `kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
// `kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
// This setting is ignored when Kafka is disabled.
// Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`.
// +optional
KafkaConsumerAutoscaler FlowCollectorHPA `json:"kafkaConsumerAutoscaler,omitempty"`

//+kubebuilder:validation:Minimum=0
// `consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`.
// This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`.
ConsumerReplicas *int32 `json:"consumerReplicas,omitempty"`

// If `unmanagedReplicas` is `true`, the operator will not reconcile `consumerReplicas`. This is useful when using a pod autoscaler.
// +optional
UnmanagedReplicas bool `json:"unmanagedReplicas,omitempty"`

//+kubebuilder:default:=1000
// +optional
// `kafkaConsumerQueueCapacity` defines the capacity of the internal message queue used in the Kafka consumer client. Ignored when not using Kafka.
Expand Down Expand Up @@ -1025,6 +1039,10 @@ type FlowCollectorConsolePlugin struct {
// `replicas` defines the number of replicas (pods) to start.
Replicas *int32 `json:"replicas,omitempty"`

// If `unmanagedReplicas` is `true`, the operator will not reconcile `replicas`. This is useful when using a pod autoscaler.
// +optional
UnmanagedReplicas bool `json:"unmanagedReplicas,omitempty"`

//+kubebuilder:validation:Enum=IfNotPresent;Always;Never
//+kubebuilder:default:=IfNotPresent
// `imagePullPolicy` is the Kubernetes pull policy for the image defined above
Expand All @@ -1041,7 +1059,8 @@ type FlowCollectorConsolePlugin struct {
// `logLevel` for the console plugin backend
LogLevel string `json:"logLevel,omitempty"`

// `autoscaler` spec of a horizontal pod autoscaler to set up for the plugin Deployment.
// `autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment.
// Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`.
// +optional
Autoscaler FlowCollectorHPA `json:"autoscaler,omitempty"`

Expand Down
15 changes: 10 additions & 5 deletions api/flowcollector/v1beta2/flowcollector_validation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,16 @@ func (v *validator) validateScheduling() {
}

func (v *validator) validateFLPLogTypes() {
if v.fc.Processor.LogTypes != nil && *v.fc.Processor.LogTypes == LogTypeAll {
v.warnings = append(v.warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint")
}
if v.fc.Processor.LogTypes != nil && *v.fc.Processor.LogTypes != LogTypeFlows && v.fc.Loki.Enable != nil && !*v.fc.Loki.Enable {
v.errors = append(v.errors, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit"))
if v.fc.Processor.HasConntrack() {
if *v.fc.Processor.LogTypes == LogTypeAll {
v.warnings = append(v.warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint")
}
if !v.fc.UseLoki() {
v.errors = append(v.errors, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit"))
}
if v.fc.DeploymentModel == DeploymentModelService {
v.errors = append(v.errors, errors.New("cannot enable conversation tracking when spec.deploymentModel is Service: you must disable it, or change the deployment model"))
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,21 @@ func TestValidateConntrack(t *testing.T) {
},
expectedError: "enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit",
},
{
name: "Conntrack not allowed with deploymentModel Service",
fc: &FlowCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
},
Spec: FlowCollectorSpec{
DeploymentModel: DeploymentModelService,
Processor: FlowCollectorFLP{
LogTypes: ptr.To(LogTypeConversations),
},
},
},
expectedError: "cannot enable conversation tracking when spec.deploymentModel is Service: you must disable it, or change the deployment model",
},
}

r := FlowCollector{}
Expand Down
35 changes: 31 additions & 4 deletions api/flowcollector/v1beta2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ func (spec *FlowCollectorSpec) HasKafkaExporter() bool {
return false
}

func (spec *FlowCollectorHPA) HPAEnabled() bool {
return spec != nil && spec.Status == HPAStatusEnabled
}

func (cfg *SASLConfig) UseSASL() bool {
return cfg.Type == SASLPlain || cfg.Type == SASLScramSHA512
}
Expand Down Expand Up @@ -67,6 +63,10 @@ func (spec *FlowCollectorSpec) UseTestConsolePlugin() bool {
return false
}

func (spec *FlowCollectorSpec) UseHostNetwork() bool {
return spec.DeploymentModel == DeploymentModelDirect
}

func (spec *FlowCollectorEBPF) IsAgentFeatureEnabled(feature AgentFeature) bool {
for _, f := range spec.Features {
if f == feature {
Expand Down Expand Up @@ -196,3 +196,30 @@ func (spec *FlowCollectorSpec) HasExperimentalAlertsHealth() bool {
func (spec *FlowCollectorSpec) DeployNetworkPolicy() bool {
return spec.NetworkPolicy.Enable != nil && *spec.NetworkPolicy.Enable
}

func (spec *FlowCollectorFLP) GetFLPReplicas() int32 {
if spec.ConsumerReplicas != nil {
return *spec.ConsumerReplicas
} else if spec.KafkaConsumerReplicas != nil {
return *spec.KafkaConsumerReplicas
}
return 3
}

func (spec *FlowCollectorHPA) IsHPAEnabled() bool {
return spec != nil && spec.Status == HPAStatusEnabled
}

func (spec *FlowCollectorFLP) IsUnmanagedFLPReplicas() bool {
if spec.UnmanagedReplicas {
return true
}
return spec.KafkaConsumerAutoscaler.IsHPAEnabled()
}

func (spec *FlowCollectorConsolePlugin) IsUnmanagedConsolePluginReplicas() bool {
if spec.UnmanagedReplicas {
return true
}
return spec.Autoscaler.IsHPAEnabled()
}
5 changes: 5 additions & 0 deletions api/flowcollector/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 28 additions & 6 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2749,8 +2749,9 @@ spec:
type: object
type: object
autoscaler:
description: '`autoscaler` spec of a horizontal pod autoscaler
to set up for the plugin Deployment.'
description: |-
`autoscaler` [deprecated (*)] spec of a horizontal pod autoscaler to set up for the plugin Deployment.
Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.consolePlugin.unmanagedReplicas` to `true`.
properties:
maxReplicas:
default: 3
Expand Down Expand Up @@ -3210,16 +3211,23 @@ spec:
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
type: object
unmanagedReplicas:
description: If `unmanagedReplicas` is `true`, the operator will
not reconcile `replicas`. This is useful when using a pod autoscaler.
type: boolean
type: object
deploymentModel:
default: Direct
description: |-
`deploymentModel` defines the desired type of deployment for flow processing. Possible values are:<br>
- `Direct` (default) to make the flow processor listen directly from the agents.<br>
- `Direct` (default) to make the flow processor listen directly from the agents using the host network, backed by a DaemonSet.<br>
- `Service` to make the flow processor listen as a Kubernetes Service, backed by a scalable Deployment.<br>
- `Kafka` to make flows sent to a Kafka pipeline before consumption by the processor.<br>
Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).
Kafka can provide better scalability, resiliency, and high availability (for more details, see https://www.redhat.com/en/topics/integration/what-is-apache-kafka).<br>
`Direct` is not recommended on large clusters as it is less memory efficient.
enum:
- Direct
- Service
- Kafka
type: string
exporters:
Expand Down Expand Up @@ -5367,6 +5375,13 @@ spec:
in the flows data. This is useful in a multi-cluster context.
When using OpenShift, leave empty to make it automatically determined.'
type: string
consumerReplicas:
description: |-
`consumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline`.
This setting is ignored when `spec.deploymentModel` is `Direct` or when `spec.processor.unmanagedReplicas` is `true`.
format: int32
minimum: 0
type: integer
deduper:
description: '`deduper` allows you to sample or drop flows identified
as duplicates, in order to save on resource usage.'
Expand Down Expand Up @@ -5435,8 +5450,9 @@ spec:
type: string
kafkaConsumerAutoscaler:
description: |-
`kafkaConsumerAutoscaler` is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
`kafkaConsumerAutoscaler` [deprecated (*)] is the spec of a horizontal pod autoscaler to set up for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
This setting is ignored when Kafka is disabled.
Deprecation notice: managed autoscaler will be removed in a future version. You may configure instead an autoscaler of your choice, and set `spec.processor.unmanagedReplicas` to `true`.
properties:
maxReplicas:
default: 3
Expand Down Expand Up @@ -5750,8 +5766,9 @@ spec:
kafkaConsumerReplicas:
default: 3
description: |-
`kafkaConsumerReplicas` defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
`kafkaConsumerReplicas` [deprecated (*)] defines the number of replicas (pods) to start for `flowlogs-pipeline-transformer`, which consumes Kafka messages.
This setting is ignored when Kafka is disabled.
Deprecation notice: use `spec.processor.consumerReplicas` instead.
format: int32
minimum: 0
type: integer
Expand Down Expand Up @@ -6126,6 +6143,11 @@ spec:
external traffic: flows that are not labeled for those subnets are external to the cluster. Enabled by default on OpenShift.
type: boolean
type: object
unmanagedReplicas:
description: If `unmanagedReplicas` is `true`, the operator will
not reconcile `consumerReplicas`. This is useful when using
a pod autoscaler.
type: boolean
type: object
prometheus:
description: '`prometheus` defines Prometheus settings, such as querier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ spec:
path: consolePlugin.portNaming.enable
- displayName: Port names
path: consolePlugin.portNaming.portNames
- displayName: Unmanaged replicas
path: consolePlugin.unmanagedReplicas
- displayName: Address
path: kafka.address
- displayName: Topic
Expand Down Expand Up @@ -606,6 +608,8 @@ spec:
path: networkPolicy.additionalNamespaces
- displayName: Enable
path: networkPolicy.enable
- displayName: Consumer replicas
path: processor.consumerReplicas
- displayName: Deduper
path: processor.deduper
- displayName: Mode
Expand All @@ -628,6 +632,8 @@ spec:
path: processor.subnetLabels
- displayName: Custom labels
path: processor.subnetLabels.customLabels
- displayName: Unmanaged replicas
path: processor.unmanagedReplicas
- displayName: Prometheus
path: prometheus
- displayName: Querier
Expand Down Expand Up @@ -726,7 +732,7 @@ spec:

- Quick filters (`spec.consolePlugin.quickFilters`): configure preset filters to be displayed in the Console plugin. They offer a way to quickly switch from filters to others, such as showing / hiding pods network, or infrastructure network, or application network, etc. They can be tuned to reflect the different workloads running on your cluster. For a list of available filters, [check this page](https://github.com/netobserv/network-observability-operator/blob/1.9.2-community/docs/QuickFilters.md).

- Kafka (`spec.deploymentModel: KAFKA` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created.
- Kafka (`spec.deploymentModel: Kafka` and `spec.kafka`): when enabled, integrates the flow collection pipeline with Kafka, by splitting ingestion from transformation (kube enrichment, derived metrics, ...). Kafka can provide better scalability, resiliency and high availability ([view more details](https://www.redhat.com/en/topics/integration/what-is-apache-kafka)). Assumes Kafka is already deployed and a topic is created.

- Exporters (`spec.exporters`) an optional list of exporters to which to send enriched flows. KAFKA and IPFIX exporters are supported. This allows you to define any custom storage or processing that can read from Kafka or use the IPFIX standard.

Expand Down
Loading