Skip to content

Commit cb400ab

Browse files
committed
Add anomaly transform support
1 parent 2abf07c commit cb400ab

File tree

10 files changed

+456
-0
lines changed

10 files changed

+456
-0
lines changed

docs/api.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,37 @@ Following is the supported API format for network transformations:
303303
flowDirectionField: field providing the flow direction in the input entries; it will be rewritten
304304
ifDirectionField: interface-level field for flow direction, to create in output
305305
</pre>
306+
## Transform Anomaly API
307+
Following is the supported API format for anomaly detection transformations:
308+
309+
<pre>
310+
anomaly:
311+
algorithm: (enum) algorithm used to score anomalies: ewma or zscore
312+
valueField: field containing the numeric value to evaluate
313+
keyFields: list of fields combined to build the per-entity baseline key
314+
windowSize: number of recent samples to keep for baseline statistics
315+
baselineWindow: minimum number of samples before anomaly scores are emitted
316+
sensitivity: threshold multiplier for flagging anomalies (e.g., z-score)
317+
ewmaAlpha: smoothing factor for ewma algorithm; derived from windowSize if omitted
318+
</pre>
319+
320+
Example pipeline stage:
321+
322+
<pre>
323+
- name: anomaly
324+
transform:
325+
type: anomaly
326+
anomaly:
327+
algorithm: zscore
328+
valueField: bytes
329+
keyFields: [srcIP, dstIP, proto]
330+
windowSize: 20
331+
baselineWindow: 5
332+
sensitivity: 3
333+
</pre>
334+
The anomaly stage appends `anomaly_score`, `anomaly_type`, and `baseline_window` to each flow record. Scores reflect the chosen
335+
algorithm (EWMA or z-score); a warming-up period suppresses alerts until the baseline window is populated. Grafana users can
336+
alert on `anomaly_type != "normal"` or threshold `anomaly_score` using the emitted fields.
306337
## Write Loki API
307338
Following is the supported API format for writing to loki:
308339

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
log-level: info
3+
pipeline:
4+
- name: ingest
5+
- name: detect-anomaly
6+
- name: write
7+
parameters:
8+
- name: ingest
9+
ingest:
10+
type: synthetic
11+
synthetic:
12+
records: 10
13+
- name: detect-anomaly
14+
transform:
15+
type: anomaly
16+
anomaly:
17+
algorithm: zscore
18+
valueField: bytes
19+
keyFields: [srcIP, dstIP, proto]
20+
windowSize: 20
21+
baselineWindow: 5
22+
sensitivity: 3
23+
- name: write
24+
write:
25+
type: stdout

pkg/api/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
GenericType = "generic"
4141
NetworkType = "network"
4242
FilterType = "filter"
43+
AnomalyType = "anomaly"
4344
ConnTrackType = "conntrack"
4445
NoneType = "none"
4546

@@ -60,6 +61,7 @@ type API struct {
6061
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
6162
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
6263
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
64+
TransformAnomaly TransformAnomaly `yaml:"anomaly" doc:"## Transform Anomaly API\nFollowing is the supported API format for anomaly detection transformations:\n"`
6365
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
6466
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
6567
WriteIPFIX WriteIpfix `yaml:"ipfix" doc:"## Write IPFIX\nFollowing is the supported API format for writing to an IPFIX collector:\n"`

pkg/api/transform_anomaly.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (C) 2024 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
// TransformAnomalyAlgorithm defines the supported anomaly detection strategies.
21+
// For doc generation, enum definitions must match format `Constant Type = "value" // doc`
22+
type TransformAnomalyAlgorithm string
23+
24+
const (
25+
AnomalyAlgorithmEWMA TransformAnomalyAlgorithm = "ewma" // exponentially weighted moving average baseline
26+
AnomalyAlgorithmZScore TransformAnomalyAlgorithm = "zscore" // rolling z-score over a sliding window
27+
)
28+
29+
// TransformAnomaly describes configuration for anomaly detection stages.
30+
type TransformAnomaly struct {
31+
Algorithm TransformAnomalyAlgorithm `yaml:"algorithm,omitempty" json:"algorithm,omitempty" doc:"(enum) algorithm used to score anomalies: ewma or zscore"`
32+
ValueField string `yaml:"valueField,omitempty" json:"valueField,omitempty" doc:"field containing the numeric value to evaluate"`
33+
KeyFields []string `yaml:"keyFields,omitempty" json:"keyFields,omitempty" doc:"list of fields combined to build the per-entity baseline key"`
34+
WindowSize int `yaml:"windowSize,omitempty" json:"windowSize,omitempty" doc:"number of recent samples to keep for baseline statistics"`
35+
BaselineWindow int `yaml:"baselineWindow,omitempty" json:"baselineWindow,omitempty" doc:"minimum number of samples before anomaly scores are emitted"`
36+
Sensitivity float64 `yaml:"sensitivity,omitempty" json:"sensitivity,omitempty" doc:"threshold multiplier for flagging anomalies (e.g., z-score)"`
37+
EWMAAlpha float64 `yaml:"ewmaAlpha,omitempty" json:"ewmaAlpha,omitempty" doc:"smoothing factor for ewma algorithm; derived from windowSize if omitted"`
38+
}

pkg/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type Transform struct {
119119
Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"`
120120
Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"`
121121
Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"`
122+
Anomaly *api.TransformAnomaly `yaml:"anomaly,omitempty" json:"anomaly,omitempty"`
122123
}
123124

124125
type Extract struct {

pkg/config/stage_params.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam
5151
return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}
5252
}
5353

54+
//nolint:gocritic // hugeParam can be ignored: func only used at init
55+
func NewTransformAnomalyParams(name string, an api.TransformAnomaly) StageParam {
56+
return StageParam{Name: name, Transform: &Transform{Type: api.AnomalyType, Anomaly: &an}}
57+
}
58+
5459
//nolint:gocritic // hugeParam can be ignored: func only used at init
5560
func NewConnTrackParams(name string, ct api.ConnTrack) StageParam {
5661
return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}

pkg/pipeline/pipeline_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,8 @@ func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (t
450450
transformer, err = transform.NewTransformFilter(params)
451451
case api.NetworkType:
452452
transformer, err = transform.NewTransformNetwork(params, opMetrics)
453+
case api.AnomalyType:
454+
transformer, err = transform.NewTransformAnomaly(params, opMetrics)
453455
case api.NoneType:
454456
transformer, err = transform.NewTransformNone()
455457
default:

pkg/pipeline/transform/transform.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Definition struct {
4545
Type string
4646
Generic api.TransformGeneric
4747
Network api.TransformNetwork
48+
Anomaly api.TransformAnomaly
4849
}
4950

5051
type Definitions []Definition
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Copyright (C) 2024 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package transform
19+
20+
import (
21+
"fmt"
22+
"math"
23+
"strings"
24+
"sync"
25+
26+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
27+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
28+
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
29+
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
30+
"github.com/sirupsen/logrus"
31+
)
32+
33+
const (
34+
defaultAnomalyWindow = 30
35+
defaultAnomalySensitivity = 3.0
36+
)
37+
38+
var anomalyLog = logrus.WithField("component", "transform.Anomaly")
39+
40+
type anomalyState struct {
41+
values []float64
42+
sum float64
43+
sumSq float64
44+
baseline float64
45+
initialized bool
46+
}
47+
48+
func (s *anomalyState) addValue(v float64, window int) {
49+
s.values = append(s.values, v)
50+
s.sum += v
51+
s.sumSq += v * v
52+
if len(s.values) > window {
53+
oldest := s.values[0]
54+
s.values = s.values[1:]
55+
s.sum -= oldest
56+
s.sumSq -= oldest * oldest
57+
}
58+
}
59+
60+
func (s *anomalyState) mean() float64 {
61+
if len(s.values) == 0 {
62+
return 0
63+
}
64+
return s.sum / float64(len(s.values))
65+
}
66+
67+
func (s *anomalyState) stddev() float64 {
68+
if len(s.values) < 2 {
69+
return 0
70+
}
71+
mean := s.mean()
72+
variance := (s.sumSq / float64(len(s.values))) - (mean * mean)
73+
if variance < 0 {
74+
variance = 0
75+
}
76+
return math.Sqrt(variance)
77+
}
78+
79+
type Anomaly struct {
80+
mu sync.Mutex
81+
states map[string]*anomalyState
82+
config api.TransformAnomaly
83+
windowSize int
84+
baselineWindow int
85+
sensitivity float64
86+
alpha float64
87+
opMetrics *operational.Metrics
88+
}
89+
90+
// NewTransformAnomaly creates a new anomaly transformer.
91+
func NewTransformAnomaly(params config.StageParam, opMetrics *operational.Metrics) (Transformer, error) {
92+
anomalyConfig := api.TransformAnomaly{}
93+
if params.Transform != nil && params.Transform.Anomaly != nil {
94+
anomalyConfig = *params.Transform.Anomaly
95+
}
96+
if anomalyConfig.ValueField == "" {
97+
return nil, fmt.Errorf("valueField must be provided for anomaly transform")
98+
}
99+
100+
window := anomalyConfig.WindowSize
101+
if window <= 0 {
102+
window = defaultAnomalyWindow
103+
}
104+
baselineWindow := anomalyConfig.BaselineWindow
105+
if baselineWindow <= 0 {
106+
baselineWindow = window / 2
107+
if baselineWindow == 0 {
108+
baselineWindow = 1
109+
}
110+
}
111+
sensitivity := anomalyConfig.Sensitivity
112+
if sensitivity <= 0 {
113+
sensitivity = defaultAnomalySensitivity
114+
}
115+
alpha := anomalyConfig.EWMAAlpha
116+
if alpha <= 0 {
117+
alpha = 2.0 / (float64(window) + 1.0)
118+
}
119+
if len(anomalyConfig.KeyFields) == 0 {
120+
anomalyConfig.KeyFields = []string{"SrcAddr", "DstAddr", "Proto"}
121+
}
122+
if anomalyConfig.Algorithm == "" {
123+
anomalyConfig.Algorithm = api.AnomalyAlgorithmZScore
124+
}
125+
126+
anomalyLog.Infof("NewTransformAnomaly algorithm=%s window=%d baselineWindow=%d", anomalyConfig.Algorithm, window, baselineWindow)
127+
return &Anomaly{
128+
states: make(map[string]*anomalyState),
129+
config: anomalyConfig,
130+
windowSize: window,
131+
baselineWindow: baselineWindow,
132+
sensitivity: sensitivity,
133+
alpha: alpha,
134+
opMetrics: opMetrics,
135+
}, nil
136+
}
137+
138+
// Transform calculates anomaly scores per key and appends anomaly fields.
139+
func (a *Anomaly) Transform(entry config.GenericMap) (config.GenericMap, bool) {
140+
value, err := utils.ConvertToFloat64(entry[a.config.ValueField])
141+
if err != nil {
142+
anomalyLog.Errorf("unable to convert %s to float: %v", a.config.ValueField, err)
143+
return entry, false
144+
}
145+
key := a.buildKey(entry)
146+
147+
a.mu.Lock()
148+
state, ok := a.states[key]
149+
if !ok {
150+
state = &anomalyState{}
151+
a.states[key] = state
152+
}
153+
anomalyType, score := a.score(state, value)
154+
state.addValue(value, a.windowSize)
155+
stateSize := len(state.values)
156+
a.mu.Unlock()
157+
158+
output := entry.Copy()
159+
output["anomaly_score"] = score
160+
output["anomaly_type"] = anomalyType
161+
output["baseline_window"] = stateSize
162+
163+
return output, true
164+
}
165+
166+
func (a *Anomaly) score(state *anomalyState, value float64) (string, float64) {
167+
if len(state.values) < a.baselineWindow {
168+
if !state.initialized {
169+
state.baseline = value
170+
state.initialized = true
171+
}
172+
return "warming_up", 0
173+
}
174+
175+
switch a.config.Algorithm {
176+
case api.AnomalyAlgorithmEWMA:
177+
return a.scoreEWMA(state, value)
178+
case api.AnomalyAlgorithmZScore:
179+
fallthrough
180+
default:
181+
return a.scoreZScore(state, value)
182+
}
183+
}
184+
185+
func (a *Anomaly) scoreEWMA(state *anomalyState, value float64) (string, float64) {
186+
if !state.initialized {
187+
state.baseline = value
188+
state.initialized = true
189+
}
190+
deviation := value - state.baseline
191+
stddev := state.stddev()
192+
if stddev == 0 {
193+
stddev = math.Max(math.Abs(state.baseline)*1e-6, 1e-9)
194+
}
195+
score := math.Abs(deviation) / stddev
196+
state.baseline = state.baseline + a.alpha*(value-state.baseline)
197+
anomalyType := "normal"
198+
if score >= a.sensitivity {
199+
if deviation > 0 {
200+
anomalyType = "ewma_high"
201+
} else {
202+
anomalyType = "ewma_low"
203+
}
204+
}
205+
return anomalyType, score
206+
}
207+
208+
func (a *Anomaly) scoreZScore(state *anomalyState, value float64) (string, float64) {
209+
mean := state.mean()
210+
stddev := state.stddev()
211+
if stddev == 0 {
212+
stddev = math.Max(math.Abs(mean)*1e-6, 1e-9)
213+
}
214+
score := math.Abs(value-mean) / stddev
215+
anomalyType := "normal"
216+
if score >= a.sensitivity {
217+
if value > mean {
218+
anomalyType = "zscore_high"
219+
} else {
220+
anomalyType = "zscore_low"
221+
}
222+
}
223+
return anomalyType, score
224+
}
225+
226+
func (a *Anomaly) buildKey(entry config.GenericMap) string {
227+
parts := make([]string, 0, len(a.config.KeyFields))
228+
for _, key := range a.config.KeyFields {
229+
if val, ok := entry[key]; ok {
230+
parts = append(parts, fmt.Sprint(val))
231+
} else {
232+
parts = append(parts, "<missing>")
233+
}
234+
}
235+
return strings.Join(parts, "|")
236+
}
237+
238+
// Reset clears the internal state; useful for tests.
239+
func (a *Anomaly) Reset() {
240+
a.mu.Lock()
241+
defer a.mu.Unlock()
242+
a.states = make(map[string]*anomalyState)
243+
}

0 commit comments

Comments
 (0)