Skip to content

Conversation

@vatankh
Copy link

@vatankh vatankh commented Nov 26, 2025

Description

This PR introduces a new anomaly transform stage to flowlogs-pipeline as a first step toward anomaly detection for Kubernetes network flows (see issue #).

Key points:

  • Adds a new type: anomaly transform that computes streaming anomaly scores per key.
  • Supports two algorithms:
    • zscore: rolling z-score over a sliding window.
    • ewma: exponentially weighted moving average baseline.
  • Configuration options:
    • algorithm (ewma | zscore)
    • valueField (numeric field, e.g. Bytes)
    • keyFields (used to group flows per entity, e.g. [SrcAddr, DstAddr, Proto])
    • windowSize, baselineWindow, sensitivity, ewmaAlpha
  • Emits additional fields on each record:
    • anomaly_score
    • anomaly_type (e.g. warming_up, normal, zscore_high, zscore_low, ewma_high, ewma_low)
    • baseline_window (current number of samples in the baseline window)
  • Adds API docs and an example pipeline (hack/examples/pipeline-anomaly.yaml).

This is intentionally a local, per-instance anomaly stage that works on the existing pipeline input only; it does not consume Loki/Kafka yet, as discussed in the issue conversation.

Dependencies

n/a

Testing

  • go test ./pkg/pipeline/transform -run TestTransformAnomaly
  • go test ./...
  • Manual run:
    • go build ./cmd/flowlogs-pipeline
    • ./flowlogs-pipeline --log-level debug --config hack/examples/pipeline-anomaly.yaml

Checklist

If you are not familiar with our processes or don't know what to answer in the list below, let us know in a comment: the maintainers will take care of that.

  • [ x] Will this change affect NetObserv / Network Observability operator? If not, you can ignore the rest of this checklist.
    No, this change only adds a new optional transform in flowlogs-pipeline and is not yet wired into the operator.

  • Is this PR backed with a JIRA ticket? If so, make sure it is written as a title prefix (in general, PRs affecting the NetObserv/Network Observability product should be backed with a JIRA ticket - especially if they bring user facing changes).

  • Does this PR require product documentation?

    • If so, make sure the JIRA epic is labelled with "documentation" and provides a description relevant for doc writers, such as use cases or scenarios. Any required step to activate or configure the feature should be documented there, such as new CRD knobs.
  • Does this PR require a product release notes entry?

    • If so, fill in "Release Note Text" in the JIRA.
  • Is there anything else the QE team should know before testing? E.g: configuration changes, environment setup, etc.

    • If so, make sure it is described in the JIRA ticket.
  • QE requirements (check 1 from the list):

    • Standard QE validation, with pre-merge tests unless stated otherwise.
    • Regression tests only (e.g. refactoring with no user-facing change).
    • No QE (e.g. trivial change with high reviewer's confidence, or per agreement with the QE team).

To run a perfscale test, comment with: /test flp-node-density-heavy-25nodes

@openshift-ci
Copy link

openshift-ci bot commented Nov 26, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign oliviercazade for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-ci
Copy link

openshift-ci bot commented Nov 26, 2025

Hi @vatankh. Thanks for your PR.

I'm waiting for a github.com member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

func (a *Anomaly) Transform(entry config.GenericMap) (config.GenericMap, bool) {
value, err := utils.ConvertToFloat64(entry[a.config.ValueField])
if err != nil {
anomalyLog.Errorf("unable to convert %s to float: %v", a.config.ValueField, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid flooding logs with errors in the data path, we tend to use an error metric rather than logs, like you can see here: https://github.com/netobserv/flowlogs-pipeline/blob/main/pkg/pipeline/encode/metrics_common.go#L192

parts := make([]string, 0, len(a.config.KeyFields))
for _, key := range a.config.KeyFields {
if val, ok := entry[key]; ok {
parts = append(parts, fmt.Sprint(val))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use utils.ConvertToString for this kind of conversion - it should be more performant than fmt-package conversions

@jotak
Copy link
Member

jotak commented Dec 9, 2025

Thanks @vatankh ! This is looking pretty good already.

I have a few more comments, let's start with the nitpicking one :-) : could you remove .idea from the PR, and add it to .gitignore ?

Then a comment on the API design: as it is, it doesn't allow to run several anomaly detections (e.g. on several valueFields, or with different keys). A single Anomaly stage runs for a single value field, and if several stages are defined, they would conflict when writing on the same output fields. A simple way to fix this would be to add a Prefix field to the API config, which would prefix the "anomaly_score", "anomaly_type" and "baseline_window" outputs. So each stage would define a different prefix, allowing for disambiguation.

Another approach would be to allow multiple value fields in a single stage.

stddev = math.Max(math.Abs(state.baseline)*1e-6, 1e-9)
}
score := math.Abs(deviation) / stddev
state.baseline = state.baseline + a.alpha*(value-state.baseline)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state.baseline = state.baseline + a.alpha*(value-state.baseline)
state.baseline += a.alpha*(value-state.baseline)

Comment on lines +215 to +222
anomalyType := "normal"
if score >= a.sensitivity {
if value > mean {
anomalyType = "zscore_high"
} else {
anomalyType = "zscore_low"
}
}
Copy link
Collaborator

@jpinsonneau jpinsonneau Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an enum for that ?

You could then use it as return type instead of string

Comment on lines +25 to +27
- name: write
write:
type: stdout
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good enough as example but could you explain what's the final goal for your usage ?

Do you want to expose that in a prometheus metric or somewhere else ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants