diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 000000000..a0ccf77bc --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Environment-dependent path to Maven home directory +/mavenHomeManager.xml diff --git a/.idea/flowlogs-pipeline.iml b/.idea/flowlogs-pipeline.iml new file mode 100644 index 000000000..d6ebd4805 --- /dev/null +++ b/.idea/flowlogs-pipeline.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 000000000..a9182a4ba --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..0161eb7a5 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..35eb1ddfb --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/docs/api.md b/docs/api.md index 8d811cc2c..f92275d9a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -303,6 +303,37 @@ Following is the supported API format for network transformations: flowDirectionField: field providing the flow direction in the input entries; it will be rewritten ifDirectionField: interface-level field for flow direction, to create in output +## Transform Anomaly API +Following is the supported API format for anomaly detection transformations: + +
+ anomaly:
+         algorithm: (enum) algorithm used to score anomalies: ewma or zscore
+         valueField: field containing the numeric value to evaluate
+         keyFields: list of fields combined to build the per-entity baseline key
+         windowSize: number of recent samples to keep for baseline statistics
+         baselineWindow: minimum number of samples before anomaly scores are emitted
+         sensitivity: threshold multiplier for flagging anomalies (e.g., z-score)
+         ewmaAlpha: smoothing factor for ewma algorithm; derived from windowSize if omitted
+
+ +Example pipeline stage: + +
+  - name: anomaly
+    transform:
+      type: anomaly
+      anomaly:
+        algorithm: zscore
+        valueField: bytes
+        keyFields: [srcIP, dstIP, proto]
+        windowSize: 20
+        baselineWindow: 5
+        sensitivity: 3
+
+The anomaly stage appends `anomaly_score`, `anomaly_type`, and `baseline_window` to each flow record. Scores reflect the chosen +algorithm (EWMA or z-score); a warming-up period suppresses alerts until the baseline window is populated. Grafana users can +alert on `anomaly_type != "normal"` or threshold `anomaly_score` using the emitted fields. ## Write Loki API Following is the supported API format for writing to loki: diff --git a/hack/examples/pipeline-anomaly.yaml b/hack/examples/pipeline-anomaly.yaml new file mode 100644 index 000000000..b9d5365e5 --- /dev/null +++ b/hack/examples/pipeline-anomaly.yaml @@ -0,0 +1,27 @@ +--- +log-level: info +pipeline: + - name: ingest + - name: detect-anomaly + follows: ingest + - name: write + follows: detect-anomaly +parameters: + - name: ingest + ingest: + type: synthetic + synthetic: + flowLogsPerMin: 10 + - name: detect-anomaly + transform: + type: anomaly + anomaly: + algorithm: zscore + valueField: Bytes + keyFields: [SrcAddr, DstAddr, Proto] + windowSize: 20 + baselineWindow: 5 + sensitivity: 3 + - name: write + write: + type: stdout diff --git a/pkg/api/api.go b/pkg/api/api.go index 574d9f723..8d9b6cb99 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -40,6 +40,7 @@ const ( GenericType = "generic" NetworkType = "network" FilterType = "filter" + AnomalyType = "anomaly" ConnTrackType = "conntrack" NoneType = "none" @@ -60,6 +61,7 @@ type API struct { TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` + TransformAnomaly TransformAnomaly `yaml:"anomaly" doc:"## Transform Anomaly API\nFollowing is the supported API format for anomaly detection transformations:\n"` WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` WriteIPFIX WriteIpfix `yaml:"ipfix" doc:"## Write IPFIX\nFollowing is the supported API format for writing to an IPFIX collector:\n"` diff --git a/pkg/api/transform_anomaly.go b/pkg/api/transform_anomaly.go new file mode 100644 index 000000000..dc1a6dcfa --- /dev/null +++ b/pkg/api/transform_anomaly.go @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +// TransformAnomalyAlgorithm defines the supported anomaly detection strategies. +// For doc generation, enum definitions must match format `Constant Type = "value" // doc` +type TransformAnomalyAlgorithm string + +const ( + AnomalyAlgorithmEWMA TransformAnomalyAlgorithm = "ewma" // exponentially weighted moving average baseline + AnomalyAlgorithmZScore TransformAnomalyAlgorithm = "zscore" // rolling z-score over a sliding window +) + +// TransformAnomaly describes configuration for anomaly detection stages. +type TransformAnomaly struct { + Algorithm TransformAnomalyAlgorithm `yaml:"algorithm,omitempty" json:"algorithm,omitempty" doc:"(enum) algorithm used to score anomalies: ewma or zscore"` + ValueField string `yaml:"valueField,omitempty" json:"valueField,omitempty" doc:"field containing the numeric value to evaluate"` + KeyFields []string `yaml:"keyFields,omitempty" json:"keyFields,omitempty" doc:"list of fields combined to build the per-entity baseline key"` + WindowSize int `yaml:"windowSize,omitempty" json:"windowSize,omitempty" doc:"number of recent samples to keep for baseline statistics"` + BaselineWindow int `yaml:"baselineWindow,omitempty" json:"baselineWindow,omitempty" doc:"minimum number of samples before anomaly scores are emitted"` + Sensitivity float64 `yaml:"sensitivity,omitempty" json:"sensitivity,omitempty" doc:"threshold multiplier for flagging anomalies (e.g., z-score)"` + EWMAAlpha float64 `yaml:"ewmaAlpha,omitempty" json:"ewmaAlpha,omitempty" doc:"smoothing factor for ewma algorithm; derived from windowSize if omitted"` +} diff --git a/pkg/config/config.go b/pkg/config/config.go index d907d2c8b..ed0fa0b90 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -119,6 +119,7 @@ type Transform struct { Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"` Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"` Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"` + Anomaly *api.TransformAnomaly `yaml:"anomaly,omitempty" json:"anomaly,omitempty"` } type Extract struct { diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go index c7c42cf3c..29581173d 100644 --- a/pkg/config/stage_params.go +++ b/pkg/config/stage_params.go @@ -51,6 +51,11 @@ func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}} } +//nolint:gocritic // hugeParam can be ignored: func only used at init +func NewTransformAnomalyParams(name string, an api.TransformAnomaly) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.AnomalyType, Anomaly: &an}} +} + //nolint:gocritic // hugeParam can be ignored: func only used at init func NewConnTrackParams(name string, ct api.ConnTrack) StageParam { return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}} diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 5b076458c..c4907e943 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -450,6 +450,8 @@ func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (t transformer, err = transform.NewTransformFilter(params) case api.NetworkType: transformer, err = transform.NewTransformNetwork(params, opMetrics) + case api.AnomalyType: + transformer, err = transform.NewTransformAnomaly(params, opMetrics) case api.NoneType: transformer, err = transform.NewTransformNone() default: diff --git a/pkg/pipeline/transform/transform.go b/pkg/pipeline/transform/transform.go index 5b51a41ae..76ab35239 100644 --- a/pkg/pipeline/transform/transform.go +++ b/pkg/pipeline/transform/transform.go @@ -45,6 +45,7 @@ type Definition struct { Type string Generic api.TransformGeneric Network api.TransformNetwork + Anomaly api.TransformAnomaly } type Definitions []Definition diff --git a/pkg/pipeline/transform/transform_anomaly.go b/pkg/pipeline/transform/transform_anomaly.go new file mode 100644 index 000000000..574f660fd --- /dev/null +++ b/pkg/pipeline/transform/transform_anomaly.go @@ -0,0 +1,243 @@ +/* + * Copyright (C) 2024 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transform + +import ( + "fmt" + "math" + "strings" + "sync" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "github.com/sirupsen/logrus" +) + +const ( + defaultAnomalyWindow = 30 + defaultAnomalySensitivity = 3.0 +) + +var anomalyLog = logrus.WithField("component", "transform.Anomaly") + +type anomalyState struct { + values []float64 + sum float64 + sumSq float64 + baseline float64 + initialized bool +} + +func (s *anomalyState) addValue(v float64, window int) { + s.values = append(s.values, v) + s.sum += v + s.sumSq += v * v + if len(s.values) > window { + oldest := s.values[0] + s.values = s.values[1:] + s.sum -= oldest + s.sumSq -= oldest * oldest + } +} + +func (s *anomalyState) mean() float64 { + if len(s.values) == 0 { + return 0 + } + return s.sum / float64(len(s.values)) +} + +func (s *anomalyState) stddev() float64 { + if len(s.values) < 2 { + return 0 + } + mean := s.mean() + variance := (s.sumSq / float64(len(s.values))) - (mean * mean) + if variance < 0 { + variance = 0 + } + return math.Sqrt(variance) +} + +type Anomaly struct { + mu sync.Mutex + states map[string]*anomalyState + config api.TransformAnomaly + windowSize int + baselineWindow int + sensitivity float64 + alpha float64 + opMetrics *operational.Metrics +} + +// NewTransformAnomaly creates a new anomaly transformer. +func NewTransformAnomaly(params config.StageParam, opMetrics *operational.Metrics) (Transformer, error) { + anomalyConfig := api.TransformAnomaly{} + if params.Transform != nil && params.Transform.Anomaly != nil { + anomalyConfig = *params.Transform.Anomaly + } + if anomalyConfig.ValueField == "" { + return nil, fmt.Errorf("valueField must be provided for anomaly transform") + } + + window := anomalyConfig.WindowSize + if window <= 0 { + window = defaultAnomalyWindow + } + baselineWindow := anomalyConfig.BaselineWindow + if baselineWindow <= 0 { + baselineWindow = window / 2 + if baselineWindow == 0 { + baselineWindow = 1 + } + } + sensitivity := anomalyConfig.Sensitivity + if sensitivity <= 0 { + sensitivity = defaultAnomalySensitivity + } + alpha := anomalyConfig.EWMAAlpha + if alpha <= 0 { + alpha = 2.0 / (float64(window) + 1.0) + } + if len(anomalyConfig.KeyFields) == 0 { + anomalyConfig.KeyFields = []string{"SrcAddr", "DstAddr", "Proto"} + } + if anomalyConfig.Algorithm == "" { + anomalyConfig.Algorithm = api.AnomalyAlgorithmZScore + } + + anomalyLog.Infof("NewTransformAnomaly algorithm=%s window=%d baselineWindow=%d", anomalyConfig.Algorithm, window, baselineWindow) + return &Anomaly{ + states: make(map[string]*anomalyState), + config: anomalyConfig, + windowSize: window, + baselineWindow: baselineWindow, + sensitivity: sensitivity, + alpha: alpha, + opMetrics: opMetrics, + }, nil +} + +// Transform calculates anomaly scores per key and appends anomaly fields. +func (a *Anomaly) Transform(entry config.GenericMap) (config.GenericMap, bool) { + value, err := utils.ConvertToFloat64(entry[a.config.ValueField]) + if err != nil { + anomalyLog.Errorf("unable to convert %s to float: %v", a.config.ValueField, err) + return entry, false + } + key := a.buildKey(entry) + + a.mu.Lock() + state, ok := a.states[key] + if !ok { + state = &anomalyState{} + a.states[key] = state + } + anomalyType, score := a.score(state, value) + state.addValue(value, a.windowSize) + stateSize := len(state.values) + a.mu.Unlock() + + output := entry.Copy() + output["anomaly_score"] = score + output["anomaly_type"] = anomalyType + output["baseline_window"] = stateSize + + return output, true +} + +func (a *Anomaly) score(state *anomalyState, value float64) (string, float64) { + if len(state.values) < a.baselineWindow { + if !state.initialized { + state.baseline = value + state.initialized = true + } + return "warming_up", 0 + } + + switch a.config.Algorithm { + case api.AnomalyAlgorithmEWMA: + return a.scoreEWMA(state, value) + case api.AnomalyAlgorithmZScore: + fallthrough + default: + return a.scoreZScore(state, value) + } +} + +func (a *Anomaly) scoreEWMA(state *anomalyState, value float64) (string, float64) { + if !state.initialized { + state.baseline = value + state.initialized = true + } + deviation := value - state.baseline + stddev := state.stddev() + if stddev == 0 { + stddev = math.Max(math.Abs(state.baseline)*1e-6, 1e-9) + } + score := math.Abs(deviation) / stddev + state.baseline = state.baseline + a.alpha*(value-state.baseline) + anomalyType := "normal" + if score >= a.sensitivity { + if deviation > 0 { + anomalyType = "ewma_high" + } else { + anomalyType = "ewma_low" + } + } + return anomalyType, score +} + +func (a *Anomaly) scoreZScore(state *anomalyState, value float64) (string, float64) { + mean := state.mean() + stddev := state.stddev() + if stddev == 0 { + stddev = math.Max(math.Abs(mean)*1e-6, 1e-9) + } + score := math.Abs(value-mean) / stddev + anomalyType := "normal" + if score >= a.sensitivity { + if value > mean { + anomalyType = "zscore_high" + } else { + anomalyType = "zscore_low" + } + } + return anomalyType, score +} + +func (a *Anomaly) buildKey(entry config.GenericMap) string { + parts := make([]string, 0, len(a.config.KeyFields)) + for _, key := range a.config.KeyFields { + if val, ok := entry[key]; ok { + parts = append(parts, fmt.Sprint(val)) + } else { + parts = append(parts, "") + } + } + return strings.Join(parts, "|") +} + +// Reset clears the internal state; useful for tests. +func (a *Anomaly) Reset() { + a.mu.Lock() + defer a.mu.Unlock() + a.states = make(map[string]*anomalyState) +} diff --git a/pkg/pipeline/transform/transform_anomaly_test.go b/pkg/pipeline/transform/transform_anomaly_test.go new file mode 100644 index 000000000..b29f54631 --- /dev/null +++ b/pkg/pipeline/transform/transform_anomaly_test.go @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2024 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transform + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testConfigTransformAnomaly = `--- +log-level: debug +pipeline: + - name: anomaly1 +parameters: + - name: anomaly1 + transform: + type: anomaly + anomaly: + algorithm: zscore + valueField: bytes + keyFields: [srcIP, dstIP] + windowSize: 5 + baselineWindow: 3 + sensitivity: 2.5 +` + +func initNewTransformAnomaly(t *testing.T, configFile string) *Anomaly { + v, cfg := test.InitConfig(t, configFile) + require.NotNil(t, v) + + configParams := cfg.Parameters[0] + newTransform, err := NewTransformAnomaly(configParams, operational.NewMetrics(nil)) + require.NoError(t, err) + return newTransform.(*Anomaly) +} + +func TestNewTransformAnomalyConfig(t *testing.T) { + anomaly := initNewTransformAnomaly(t, testConfigTransformAnomaly) + require.Equal(t, 5, anomaly.windowSize) + require.Equal(t, 3, anomaly.baselineWindow) + require.Equal(t, 2.5, anomaly.sensitivity) + require.Equal(t, api.AnomalyAlgorithmZScore, anomaly.config.Algorithm) +} + +func TestTransformAnomalyZScore(t *testing.T) { + anomaly := initNewTransformAnomaly(t, testConfigTransformAnomaly) + baseFlow := func(val float64) config.GenericMap { + return config.GenericMap{"srcIP": "1.1.1.1", "dstIP": "2.2.2.2", "bytes": val} + } + // warmup period + for _, v := range []float64{100, 102, 98} { + out, ok := anomaly.Transform(baseFlow(v)) + require.True(t, ok) + assert.Equal(t, "warming_up", out["anomaly_type"]) + } + + normalOut, ok := anomaly.Transform(baseFlow(101)) + require.True(t, ok) + assert.Equal(t, "normal", normalOut["anomaly_type"]) + + anomalousOut, ok := anomaly.Transform(baseFlow(250)) + require.True(t, ok) + assert.NotEqual(t, "normal", anomalousOut["anomaly_type"]) + score := anomalousOut["anomaly_score"].(float64) + assert.GreaterOrEqual(t, score, anomaly.sensitivity) +} + +func TestTransformAnomalyReset(t *testing.T) { + anomaly := initNewTransformAnomaly(t, testConfigTransformAnomaly) + flow := config.GenericMap{"srcIP": "10.0.0.1", "dstIP": "2.2.2.2", "bytes": 50} + for i := 0; i < 3; i++ { + _, _ = anomaly.Transform(flow) + } + anomaly.Reset() + out, ok := anomaly.Transform(flow) + require.True(t, ok) + assert.Equal(t, "warming_up", out["anomaly_type"]) +} + +func BenchmarkTransformAnomaly(b *testing.B) { + anomaly := initNewTransformAnomaly(&testing.T{}, testConfigTransformAnomaly) + flow := config.GenericMap{"srcIP": "1.1.1.1", "dstIP": "2.2.2.2", "bytes": 100.0} + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = anomaly.Transform(flow) + } +}