Skip to content

Commit 5154759

Browse files
committed
NETOBSERV-1692: allow KEEP filtering logic
1 parent 5fba26a commit 5154759

File tree

4 files changed

+315
-23
lines changed

4 files changed

+315
-23
lines changed

docs/api.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ Following is the supported API format for filter transformations:
162162
remove_entry_if_equal: removes the entry if the field value equals specified value
163163
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
164164
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
165+
keep_entry: keeps the entry if the set of rules are all satisfied
165166
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
166167
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
167168
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
@@ -187,6 +188,19 @@ Following is the supported API format for filter transformations:
187188
input: entry input field
188189
value: specified value of input field:
189190
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
191+
keepEntryAllSatisfied: configuration for keep_entry rule
192+
type: (enum) one of the following:
193+
keep_entry_if_exists: keeps the entry if the field exists
194+
keep_entry_if_doesnt_exist: keeps the entry if the field does not exist
195+
keep_entry_if_equal: keeps the entry if the field value equals specified value
196+
keep_entry_if_not_equal: keeps the entry if the field value does not equal specified value
197+
keep_entry_if_regex_match: keeps the entry if the field value matches the specified regex
198+
keep_entry_if_not_regex_match: keeps the entry if the field value does not match the specified regex
199+
keepEntry: configuration for keep_entry_* rules
200+
input: entry input field
201+
value: specified value of input field:
202+
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
203+
keepEntrySampling: sampling value for keep_entry type: 1 flow on <sampling> is kept
190204
addField: configuration for add_field rule
191205
input: entry input field
192206
value: specified value of input field:

pkg/api/transform_filter.go

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,23 @@
1717

1818
package api
1919

20+
import (
21+
"errors"
22+
"regexp"
23+
)
24+
2025
type TransformFilter struct {
2126
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
2227
}
2328

24-
func (tf *TransformFilter) Preprocess() {
29+
func (tf *TransformFilter) Preprocess() error {
30+
var errs []error
2531
for i := range tf.Rules {
26-
tf.Rules[i].preprocess()
32+
if err := tf.Rules[i].preprocess(); err != nil {
33+
errs = append(errs, err)
34+
}
2735
}
36+
return errors.Join(errs...)
2837
}
2938

3039
type TransformFilterEnum string
@@ -37,6 +46,7 @@ const (
3746
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
3847
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
3948
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
49+
KeepEntry TransformFilterEnum = "keep_entry" // keeps the entry if the set of rules are all satisfied
4050
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
4151
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
4252
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
@@ -55,11 +65,24 @@ const (
5565
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
5666
)
5767

68+
type TransformFilterKeepEntryEnum string
69+
70+
const (
71+
KeepEntryIfExists TransformFilterKeepEntryEnum = "keep_entry_if_exists" // keeps the entry if the field exists
72+
KeepEntryIfDoesntExist TransformFilterKeepEntryEnum = "keep_entry_if_doesnt_exist" // keeps the entry if the field does not exist
73+
KeepEntryIfEqual TransformFilterKeepEntryEnum = "keep_entry_if_equal" // keeps the entry if the field value equals specified value
74+
KeepEntryIfNotEqual TransformFilterKeepEntryEnum = "keep_entry_if_not_equal" // keeps the entry if the field value does not equal specified value
75+
KeepEntryIfRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_regex_match" // keeps the entry if the field value matches the specified regex
76+
KeepEntryIfNotRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_not_regex_match" // keeps the entry if the field value does not match the specified regex
77+
)
78+
5879
type TransformFilterRule struct {
5980
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
6081
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
6182
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
6283
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
84+
KeepEntryAllSatisfied []*KeepEntryRule `yaml:"keepEntryAllSatisfied,omitempty" json:"keepEntryAllSatisfied,omitempty" doc:"configuration for keep_entry rule"`
85+
KeepEntrySampling uint16 `yaml:"keepEntrySampling,omitempty" json:"keepEntrySampling,omitempty" doc:"sampling value for keep_entry type: 1 flow on <sampling> is kept"`
6386
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
6487
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
6588
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
@@ -69,19 +92,35 @@ type TransformFilterRule struct {
6992
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
7093
}
7194

72-
func (r *TransformFilterRule) preprocess() {
95+
func (r *TransformFilterRule) preprocess() error {
96+
var errs []error
7397
if r.RemoveField != nil {
74-
r.RemoveField.preprocess()
98+
if err := r.RemoveField.preprocess(false); err != nil {
99+
errs = append(errs, err)
100+
}
75101
}
76102
if r.RemoveEntry != nil {
77-
r.RemoveEntry.preprocess()
103+
if err := r.RemoveEntry.preprocess(false); err != nil {
104+
errs = append(errs, err)
105+
}
78106
}
79107
for i := range r.RemoveEntryAllSatisfied {
80-
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
108+
if err := r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess(false); err != nil {
109+
errs = append(errs, err)
110+
}
111+
}
112+
for i := range r.KeepEntryAllSatisfied {
113+
err := r.KeepEntryAllSatisfied[i].KeepEntry.preprocess(r.KeepEntryAllSatisfied[i].Type == KeepEntryIfRegexMatch || r.KeepEntryAllSatisfied[i].Type == KeepEntryIfNotRegexMatch)
114+
if err != nil {
115+
errs = append(errs, err)
116+
}
81117
}
82118
for i := range r.ConditionalSampling {
83-
r.ConditionalSampling[i].preprocess()
119+
if err := r.ConditionalSampling[i].preprocess(); err != nil {
120+
errs = append(errs, err)
121+
}
84122
}
123+
return errors.Join(errs...)
85124
}
86125

87126
type TransformFilterGenericRule struct {
@@ -90,12 +129,25 @@ type TransformFilterGenericRule struct {
90129
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
91130
}
92131

93-
func (r *TransformFilterGenericRule) preprocess() {
94-
if r.CastInt {
132+
func (r *TransformFilterGenericRule) preprocess(isRegex bool) error {
133+
if isRegex {
134+
if s, ok := r.Value.(string); ok {
135+
v, err := regexp.Compile(s)
136+
if err != nil {
137+
r.Value = nil
138+
return err
139+
}
140+
r.Value = v
141+
} else {
142+
r.Value = nil
143+
return errors.New("regex filter expects string value")
144+
}
145+
} else if r.CastInt {
95146
if f, ok := r.Value.(float64); ok {
96147
r.Value = int(f)
97148
}
98149
}
150+
return nil
99151
}
100152

101153
type TransformFilterRuleWithAssignee struct {
@@ -110,13 +162,22 @@ type RemoveEntryRule struct {
110162
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
111163
}
112164

165+
type KeepEntryRule struct {
166+
Type TransformFilterKeepEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
167+
KeepEntry *TransformFilterGenericRule `yaml:"keepEntry,omitempty" json:"keepEntry,omitempty" doc:"configuration for keep_entry_* rules"`
168+
}
169+
113170
type SamplingCondition struct {
114171
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
115172
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
116173
}
117174

118-
func (s *SamplingCondition) preprocess() {
175+
func (s *SamplingCondition) preprocess() error {
176+
var errs []error
119177
for i := range s.Rules {
120-
s.Rules[i].RemoveEntry.preprocess()
178+
if err := s.Rules[i].RemoveEntry.preprocess(false); err != nil {
179+
errs = append(errs, err)
180+
}
121181
}
182+
return errors.Join(errs...)
122183
}

pkg/pipeline/transform/transform_filter.go

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,28 @@ var (
3737
)
3838

3939
type Filter struct {
40-
Rules []api.TransformFilterRule
40+
Rules []api.TransformFilterRule
41+
KeepRules []api.TransformFilterRule
4142
}
4243

4344
// Transform transforms a flow; if false is returned as a second argument, the entry is dropped
4445
func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
4546
tlog.Tracef("f = %v", f)
4647
outputEntry := entry.Copy()
4748
labels := make(map[string]string)
49+
if len(f.KeepRules) > 0 {
50+
keep := false
51+
for i := range f.KeepRules {
52+
tlog.Tracef("keep rule = %v", f.KeepRules[i])
53+
if applyRule(outputEntry, labels, &f.KeepRules[i]) {
54+
keep = true
55+
break
56+
}
57+
}
58+
if !keep {
59+
return nil, false
60+
}
61+
}
4862
for i := range f.Rules {
4963
tlog.Tracef("rule = %v", f.Rules[i])
5064
if cont := applyRule(outputEntry, labels, &f.Rules[i]); !cont {
@@ -143,6 +157,8 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
143157
return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied)
144158
case api.ConditionalSampling:
145159
return sample(entry, rule.ConditionalSampling)
160+
case api.KeepEntry:
161+
return rollSampling(rule.KeepEntrySampling) && isKeepEntrySatisfied(entry, rule.KeepEntryAllSatisfied)
146162
default:
147163
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
148164
}
@@ -159,25 +175,91 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
159175
return true
160176
}
161177

178+
func isKeepEntrySatisfied(entry config.GenericMap, rules []*api.KeepEntryRule) bool {
179+
for _, r := range rules {
180+
val, ok := entry[r.KeepEntry.Input]
181+
switch r.Type {
182+
case api.KeepEntryIfExists:
183+
if !ok {
184+
return false
185+
}
186+
case api.KeepEntryIfDoesntExist:
187+
if ok {
188+
return false
189+
}
190+
case api.KeepEntryIfEqual:
191+
if !ok || val != r.KeepEntry.Value {
192+
return false
193+
}
194+
case api.KeepEntryIfNotEqual:
195+
if ok && val == r.KeepEntry.Value {
196+
return false
197+
}
198+
case api.KeepEntryIfRegexMatch:
199+
if ok {
200+
match, ok := checkRegex(r.KeepEntry.Value, val)
201+
if !ok || !match {
202+
return false
203+
}
204+
} else {
205+
return false
206+
}
207+
case api.KeepEntryIfNotRegexMatch:
208+
if ok {
209+
match, ok := checkRegex(r.KeepEntry.Value, val)
210+
if !ok || match {
211+
return false
212+
}
213+
} else {
214+
return false
215+
}
216+
}
217+
}
218+
return true
219+
}
220+
221+
// Returns (valid, match)
222+
func checkRegex(maybeReg any, value any) (bool, bool) {
223+
reg, ok := maybeReg.(*regexp.Regexp)
224+
if !ok {
225+
return false, false
226+
}
227+
return true, reg.MatchString(utils.ConvertToString(value))
228+
}
229+
162230
func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
163231
for _, r := range rules {
164232
if isRemoveEntrySatisfied(entry, r.Rules) {
165-
return r.Value == 0 || (rndgen.Intn(int(r.Value)) == 0)
233+
return rollSampling(r.Value)
166234
}
167235
}
168236
return true
169237
}
170238

239+
func rollSampling(value uint16) bool {
240+
return value == 0 || (rndgen.Intn(int(value)) == 0)
241+
}
242+
171243
// NewTransformFilter create a new filter transform
172244
func NewTransformFilter(params config.StageParam) (Transformer, error) {
173245
tlog.Debugf("entering NewTransformFilter")
246+
keepRules := []api.TransformFilterRule{}
174247
rules := []api.TransformFilterRule{}
175248
if params.Transform != nil && params.Transform.Filter != nil {
176-
params.Transform.Filter.Preprocess()
177-
rules = params.Transform.Filter.Rules
249+
if err := params.Transform.Filter.Preprocess(); err != nil {
250+
return nil, err
251+
}
252+
for i := range params.Transform.Filter.Rules {
253+
if params.Transform.Filter.Rules[i].Type == api.KeepEntry {
254+
keepRules = append(keepRules, params.Transform.Filter.Rules[i])
255+
} else {
256+
rules = append(rules, params.Transform.Filter.Rules[i])
257+
}
258+
}
178259
}
179260
transformFilter := &Filter{
180-
Rules: rules,
261+
Rules: rules,
262+
KeepRules: keepRules,
181263
}
182264
return transformFilter, nil
183265
}

0 commit comments

Comments
 (0)