Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/flowlogs-pipeline.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,37 @@ Following is the supported API format for network transformations:
flowDirectionField: field providing the flow direction in the input entries; it will be rewritten
ifDirectionField: interface-level field for flow direction, to create in output
</pre>
## Transform Anomaly API
Following is the supported API format for anomaly detection transformations:

<pre>
anomaly:
algorithm: (enum) algorithm used to score anomalies: ewma or zscore
valueField: field containing the numeric value to evaluate
keyFields: list of fields combined to build the per-entity baseline key
windowSize: number of recent samples to keep for baseline statistics
baselineWindow: minimum number of samples before anomaly scores are emitted
sensitivity: threshold multiplier for flagging anomalies (e.g., z-score)
ewmaAlpha: smoothing factor for ewma algorithm; derived from windowSize if omitted
</pre>

Example pipeline stage:

<pre>
- name: anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: bytes
keyFields: [srcIP, dstIP, proto]
windowSize: 20
baselineWindow: 5
sensitivity: 3
</pre>
The anomaly stage appends `anomaly_score`, `anomaly_type`, and `baseline_window` to each flow record. Scores reflect the chosen
algorithm (EWMA or z-score); a warming-up period suppresses alerts until the baseline window is populated. Grafana users can
alert on `anomaly_type != "normal"` or threshold `anomaly_score` using the emitted fields.
## Write Loki API
Following is the supported API format for writing to loki:

Expand Down
27 changes: 27 additions & 0 deletions hack/examples/pipeline-anomaly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
log-level: info
pipeline:
- name: ingest
- name: detect-anomaly
follows: ingest
- name: write
follows: detect-anomaly
parameters:
- name: ingest
ingest:
type: synthetic
synthetic:
flowLogsPerMin: 10
- name: detect-anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: Bytes
keyFields: [SrcAddr, DstAddr, Proto]
windowSize: 20
baselineWindow: 5
sensitivity: 3
- name: write
write:
type: stdout
Comment on lines +25 to +27
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good enough as example but could you explain what's the final goal for your usage ?

Do you want to expose that in a prometheus metric or somewhere else ?

2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
GenericType = "generic"
NetworkType = "network"
FilterType = "filter"
AnomalyType = "anomaly"
ConnTrackType = "conntrack"
NoneType = "none"

Expand All @@ -60,6 +61,7 @@ type API struct {
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
TransformAnomaly TransformAnomaly `yaml:"anomaly" doc:"## Transform Anomaly API\nFollowing is the supported API format for anomaly detection transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
WriteIPFIX WriteIpfix `yaml:"ipfix" doc:"## Write IPFIX\nFollowing is the supported API format for writing to an IPFIX collector:\n"`
Expand Down
38 changes: 38 additions & 0 deletions pkg/api/transform_anomaly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

// TransformAnomalyAlgorithm defines the supported anomaly detection strategies.
// For doc generation, enum definitions must match format `Constant Type = "value" // doc`
type TransformAnomalyAlgorithm string

const (
AnomalyAlgorithmEWMA TransformAnomalyAlgorithm = "ewma" // exponentially weighted moving average baseline
AnomalyAlgorithmZScore TransformAnomalyAlgorithm = "zscore" // rolling z-score over a sliding window
)

// TransformAnomaly describes configuration for anomaly detection stages.
type TransformAnomaly struct {
Algorithm TransformAnomalyAlgorithm `yaml:"algorithm,omitempty" json:"algorithm,omitempty" doc:"(enum) algorithm used to score anomalies: ewma or zscore"`
ValueField string `yaml:"valueField,omitempty" json:"valueField,omitempty" doc:"field containing the numeric value to evaluate"`
KeyFields []string `yaml:"keyFields,omitempty" json:"keyFields,omitempty" doc:"list of fields combined to build the per-entity baseline key"`
WindowSize int `yaml:"windowSize,omitempty" json:"windowSize,omitempty" doc:"number of recent samples to keep for baseline statistics"`
BaselineWindow int `yaml:"baselineWindow,omitempty" json:"baselineWindow,omitempty" doc:"minimum number of samples before anomaly scores are emitted"`
Sensitivity float64 `yaml:"sensitivity,omitempty" json:"sensitivity,omitempty" doc:"threshold multiplier for flagging anomalies (e.g., z-score)"`
EWMAAlpha float64 `yaml:"ewmaAlpha,omitempty" json:"ewmaAlpha,omitempty" doc:"smoothing factor for ewma algorithm; derived from windowSize if omitted"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Transform struct {
Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"`
Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"`
Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"`
Anomaly *api.TransformAnomaly `yaml:"anomaly,omitempty" json:"anomaly,omitempty"`
}

type Extract struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/stage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam
return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewTransformAnomalyParams(name string, an api.TransformAnomaly) StageParam {
return StageParam{Name: name, Transform: &Transform{Type: api.AnomalyType, Anomaly: &an}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewConnTrackParams(name string, ct api.ConnTrack) StageParam {
return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (t
transformer, err = transform.NewTransformFilter(params)
case api.NetworkType:
transformer, err = transform.NewTransformNetwork(params, opMetrics)
case api.AnomalyType:
transformer, err = transform.NewTransformAnomaly(params, opMetrics)
case api.NoneType:
transformer, err = transform.NewTransformNone()
default:
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Definition struct {
Type string
Generic api.TransformGeneric
Network api.TransformNetwork
Anomaly api.TransformAnomaly
}

type Definitions []Definition
Loading
Loading