diff --git a/pkg/pipeline/encode/metrics/filtering.go b/pkg/pipeline/encode/metrics/filtering.go index 885d4ae2a..b6a2a12c7 100644 --- a/pkg/pipeline/encode/metrics/filtering.go +++ b/pkg/pipeline/encode/metrics/filtering.go @@ -4,15 +4,32 @@ import "github.com/netobserv/flowlogs-pipeline/pkg/config" func (p *Preprocessed) ApplyFilters(flow config.GenericMap, flatParts []config.GenericMap) (bool, []config.GenericMap) { filteredParts := flatParts - for _, filter := range p.filters { - if filter.useFlat { - filteredParts = filter.filterFlatParts(filteredParts) - if len(filteredParts) == 0 { - return false, nil + // For a given key, all related filters are OR'ed + for _, filtersPerKey := range p.filters { + allFailed := true + for _, filter := range filtersPerKey { + passed, nfp := applySingleFilter(flow, &filter, filteredParts) + if passed { + allFailed = false + filteredParts = nfp + break } - } else if !filter.predicate(flow) { + } + if allFailed { + return false, nil + } + } + return true, filteredParts +} + +func applySingleFilter(flow config.GenericMap, filter *preprocessedFilter, filteredParts []config.GenericMap) (bool, []config.GenericMap) { + if filter.useFlat { + filteredParts = filter.filterFlatParts(filteredParts) + if len(filteredParts) == 0 { return false, nil } + } else if !filter.predicate(flow) { + return false, nil } return true, filteredParts } diff --git a/pkg/pipeline/encode/metrics/preprocess.go b/pkg/pipeline/encode/metrics/preprocess.go index 5aabc8311..e1a87becf 100644 --- a/pkg/pipeline/encode/metrics/preprocess.go +++ b/pkg/pipeline/encode/metrics/preprocess.go @@ -10,7 +10,7 @@ import ( type Preprocessed struct { *api.MetricsItem - filters []preprocessedFilter + filters map[string][]preprocessedFilter MappedLabels []MappedLabel FlattenedLabels []MappedLabel } @@ -60,6 +60,7 @@ func filterToPredicate(filter api.MetricsFilter) filters.Predicate { func Preprocess(def *api.MetricsItem) *Preprocessed { mi := Preprocessed{ MetricsItem: def, + filters: make(map[string][]preprocessedFilter), } for _, l := range def.Labels { ml := MappedLabel{Source: l, Target: l} @@ -73,7 +74,7 @@ func Preprocess(def *api.MetricsItem) *Preprocessed { } } for _, f := range def.Filters { - mi.filters = append(mi.filters, preprocessedFilter{ + mi.filters[f.Key] = append(mi.filters[f.Key], preprocessedFilter{ predicate: filterToPredicate(f), useFlat: mi.isFlattened(f.Key), }) diff --git a/pkg/pipeline/encode/metrics/preprocess_test.go b/pkg/pipeline/encode/metrics/preprocess_test.go index 3a5fe8a13..42a71d6b8 100644 --- a/pkg/pipeline/encode/metrics/preprocess_test.go +++ b/pkg/pipeline/encode/metrics/preprocess_test.go @@ -42,3 +42,24 @@ func Test_Flatten(t *testing.T) { }, }, fl) } + +func Test_ORedFilters(t *testing.T) { + // Several filters on the same key are ORed + pp := Preprocess(&api.MetricsItem{Filters: []api.MetricsFilter{ + { + Key: "label", + Type: api.MetricFilterAbsence, + }, + { + Key: "label", + Type: api.MetricFilterRegex, + Value: "^EXT-.*", + }, + }}) + keep, _ := pp.ApplyFilters(config.GenericMap{"namespace": "A", "bytes": 7, "label": "Something"}, nil) + assert.False(t, keep) + keep, _ = pp.ApplyFilters(config.GenericMap{"namespace": "A", "bytes": 7, "label": "EXT-Something"}, nil) + assert.True(t, keep) + keep, _ = pp.ApplyFilters(config.GenericMap{"namespace": "A", "bytes": 7}, nil) + assert.True(t, keep) +}