Skip to content

Commit c10119c

Browse files
committed
Share filter predicate code (prom+transform)
Move prom-encode predicates filtering code to its own package and share it with "keep_entry" transforms
1 parent ec1624b commit c10119c

File tree

6 files changed

+338
-122
lines changed

6 files changed

+338
-122
lines changed

pkg/api/transform_filter.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,14 @@
1717

1818
package api
1919

20-
import (
21-
"errors"
22-
"regexp"
23-
)
24-
2520
type TransformFilter struct {
2621
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
2722
}
2823

29-
func (tf *TransformFilter) Preprocess() error {
30-
var errs []error
24+
func (tf *TransformFilter) Preprocess() {
3125
for i := range tf.Rules {
32-
if err := tf.Rules[i].preprocess(); err != nil {
33-
errs = append(errs, err)
34-
}
26+
tf.Rules[i].preprocess()
3527
}
36-
return errors.Join(errs...)
3728
}
3829

3930
type TransformFilterEnum string
@@ -92,35 +83,22 @@ type TransformFilterRule struct {
9283
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
9384
}
9485

95-
func (r *TransformFilterRule) preprocess() error {
96-
var errs []error
86+
func (r *TransformFilterRule) preprocess() {
9787
if r.RemoveField != nil {
98-
if err := r.RemoveField.preprocess(false); err != nil {
99-
errs = append(errs, err)
100-
}
88+
r.RemoveField.preprocess()
10189
}
10290
if r.RemoveEntry != nil {
103-
if err := r.RemoveEntry.preprocess(false); err != nil {
104-
errs = append(errs, err)
105-
}
91+
r.RemoveEntry.preprocess()
10692
}
10793
for i := range r.RemoveEntryAllSatisfied {
108-
if err := r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess(false); err != nil {
109-
errs = append(errs, err)
110-
}
94+
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
11195
}
11296
for i := range r.KeepEntryAllSatisfied {
113-
err := r.KeepEntryAllSatisfied[i].KeepEntry.preprocess(r.KeepEntryAllSatisfied[i].Type == KeepEntryIfRegexMatch || r.KeepEntryAllSatisfied[i].Type == KeepEntryIfNotRegexMatch)
114-
if err != nil {
115-
errs = append(errs, err)
116-
}
97+
r.KeepEntryAllSatisfied[i].KeepEntry.preprocess()
11798
}
11899
for i := range r.ConditionalSampling {
119-
if err := r.ConditionalSampling[i].preprocess(); err != nil {
120-
errs = append(errs, err)
121-
}
100+
r.ConditionalSampling[i].preprocess()
122101
}
123-
return errors.Join(errs...)
124102
}
125103

126104
type TransformFilterGenericRule struct {
@@ -129,25 +107,12 @@ type TransformFilterGenericRule struct {
129107
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
130108
}
131109

132-
func (r *TransformFilterGenericRule) preprocess(isRegex bool) error {
133-
if isRegex {
134-
if s, ok := r.Value.(string); ok {
135-
v, err := regexp.Compile(s)
136-
if err != nil {
137-
r.Value = nil
138-
return err
139-
}
140-
r.Value = v
141-
} else {
142-
r.Value = nil
143-
return errors.New("regex filter expects string value")
144-
}
145-
} else if r.CastInt {
110+
func (r *TransformFilterGenericRule) preprocess() {
111+
if r.CastInt {
146112
if f, ok := r.Value.(float64); ok {
147113
r.Value = int(f)
148114
}
149115
}
150-
return nil
151116
}
152117

153118
type TransformFilterRuleWithAssignee struct {
@@ -172,12 +137,8 @@ type SamplingCondition struct {
172137
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
173138
}
174139

175-
func (s *SamplingCondition) preprocess() error {
176-
var errs []error
140+
func (s *SamplingCondition) preprocess() {
177141
for i := range s.Rules {
178-
if err := s.Rules[i].RemoveEntry.preprocess(false); err != nil {
179-
errs = append(errs, err)
180-
}
142+
s.Rules[i].RemoveEntry.preprocess()
181143
}
182-
return errors.Join(errs...)
183144
}

pkg/pipeline/encode/metrics/preprocess.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@ import (
88
"github.com/netobserv/flowlogs-pipeline/pkg/api"
99
"github.com/netobserv/flowlogs-pipeline/pkg/config"
1010
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
11+
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
1112
)
1213

13-
type Predicate func(config.GenericMap) bool
14-
15-
var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`)
16-
1714
type Preprocessed struct {
1815
*api.MetricsItem
1916
filters []preprocessedFilter
@@ -99,17 +96,19 @@ func NotRegex(filter api.MetricsFilter) Predicate {
9996
func filterToPredicate(filter api.MetricsFilter) Predicate {
10097
switch filter.Type {
10198
case api.MetricFilterEqual:
102-
return Equal(filter)
99+
return filters.Equal(filter.Key, filter.Value, true)
103100
case api.MetricFilterNotEqual:
104-
return NotEqual(filter)
101+
return filters.NotEqual(filter.Key, filter.Value, true)
105102
case api.MetricFilterPresence:
106-
return Presence(filter)
103+
return filters.Presence(filter.Key)
107104
case api.MetricFilterAbsence:
108-
return Absence(filter)
105+
return filters.Absence(filter.Key)
109106
case api.MetricFilterRegex:
110-
return Regex(filter)
107+
r, _ := regexp.Compile(filter.Value)
108+
return filters.Regex(filter.Key, r)
111109
case api.MetricFilterNotRegex:
112-
return NotRegex(filter)
110+
r, _ := regexp.Compile(filter.Value)
111+
return filters.NotRegex(filter.Key, r)
113112
}
114113
// Default = Exact
115114
return Equal(filter)

pkg/pipeline/transform/transform_filter.go

Lines changed: 32 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2929
"github.com/netobserv/flowlogs-pipeline/pkg/config"
3030
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
31+
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
3132
"github.com/sirupsen/logrus"
3233
)
3334

@@ -38,7 +39,12 @@ var (
3839

3940
type Filter struct {
4041
Rules []api.TransformFilterRule
41-
KeepRules []api.TransformFilterRule
42+
KeepRules []predicatesRule
43+
}
44+
45+
type predicatesRule struct {
46+
predicates []filters.Predicate
47+
sampling uint16
4248
}
4349

4450
// Transform transforms a flow; if false is returned as a second argument, the entry is dropped
@@ -48,9 +54,8 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
4854
labels := make(map[string]string)
4955
if len(f.KeepRules) > 0 {
5056
keep := false
51-
for i := range f.KeepRules {
52-
tlog.Tracef("keep rule = %v", f.KeepRules[i])
53-
if applyRule(outputEntry, labels, &f.KeepRules[i]) {
57+
for _, r := range f.KeepRules {
58+
if applyPredicates(outputEntry, r) {
5459
keep = true
5560
break
5661
}
@@ -158,7 +163,8 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
158163
case api.ConditionalSampling:
159164
return sample(entry, rule.ConditionalSampling)
160165
case api.KeepEntryAllSatisfied:
161-
return rollSampling(rule.KeepEntrySampling) && isKeepEntrySatisfied(entry, rule.KeepEntryAllSatisfied)
166+
// This should be processed only in "applyPredicates". Failure to do so is a bug.
167+
tlog.Panicf("unexpected KeepEntryAllSatisfied: %v", rule)
162168
default:
163169
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
164170
}
@@ -175,58 +181,18 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
175181
return true
176182
}
177183

178-
func isKeepEntrySatisfied(entry config.GenericMap, rules []*api.KeepEntryRule) bool {
179-
for _, r := range rules {
180-
val, ok := entry[r.KeepEntry.Input]
181-
switch r.Type {
182-
case api.KeepEntryIfExists:
183-
if !ok {
184-
return false
185-
}
186-
case api.KeepEntryIfDoesntExist:
187-
if ok {
188-
return false
189-
}
190-
case api.KeepEntryIfEqual:
191-
if !ok || val != r.KeepEntry.Value {
192-
return false
193-
}
194-
case api.KeepEntryIfNotEqual:
195-
if ok && val == r.KeepEntry.Value {
196-
return false
197-
}
198-
case api.KeepEntryIfRegexMatch:
199-
if ok {
200-
match, ok := checkRegex(r.KeepEntry.Value, val)
201-
if !ok || !match {
202-
return false
203-
}
204-
} else {
205-
return false
206-
}
207-
case api.KeepEntryIfNotRegexMatch:
208-
if ok {
209-
match, ok := checkRegex(r.KeepEntry.Value, val)
210-
if !ok || match {
211-
return false
212-
}
213-
} else {
214-
return false
215-
}
184+
func applyPredicates(entry config.GenericMap, rule predicatesRule) bool {
185+
if !rollSampling(rule.sampling) {
186+
return false
187+
}
188+
for _, p := range rule.predicates {
189+
if !p(entry) {
190+
return false
216191
}
217192
}
218193
return true
219194
}
220195

221-
// Returns (valid, match)
222-
func checkRegex(maybeReg any, value any) (bool, bool) {
223-
reg, ok := maybeReg.(*regexp.Regexp)
224-
if !ok {
225-
return false, false
226-
}
227-
return true, reg.MatchString(utils.ConvertToString(value))
228-
}
229-
230196
func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
231197
for _, r := range rules {
232198
if isRemoveEntrySatisfied(entry, r.Rules) {
@@ -243,17 +209,24 @@ func rollSampling(value uint16) bool {
243209
// NewTransformFilter create a new filter transform
244210
func NewTransformFilter(params config.StageParam) (Transformer, error) {
245211
tlog.Debugf("entering NewTransformFilter")
246-
keepRules := []api.TransformFilterRule{}
212+
keepRules := []predicatesRule{}
247213
rules := []api.TransformFilterRule{}
248214
if params.Transform != nil && params.Transform.Filter != nil {
249-
if err := params.Transform.Filter.Preprocess(); err != nil {
250-
return nil, err
251-
}
215+
params.Transform.Filter.Preprocess()
252216
for i := range params.Transform.Filter.Rules {
253-
if params.Transform.Filter.Rules[i].Type == api.KeepEntryAllSatisfied {
254-
keepRules = append(keepRules, params.Transform.Filter.Rules[i])
217+
baseRules := &params.Transform.Filter.Rules[i]
218+
if baseRules.Type == api.KeepEntryAllSatisfied {
219+
pr := predicatesRule{sampling: baseRules.KeepEntrySampling}
220+
for _, keepRule := range baseRules.KeepEntryAllSatisfied {
221+
pred, err := filters.FromKeepEntry(keepRule)
222+
if err != nil {
223+
return nil, err
224+
}
225+
pr.predicates = append(pr.predicates, pred)
226+
}
227+
keepRules = append(keepRules, pr)
255228
} else {
256-
rules = append(rules, params.Transform.Filter.Rules[i])
229+
rules = append(rules, *baseRules)
257230
}
258231
}
259232
}

pkg/utils/convert.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,9 @@ func ConvertToBool(unk interface{}) (bool, error) {
246246
func ConvertToString(unk interface{}) string {
247247
switch i := unk.(type) {
248248
case float64:
249-
return strconv.FormatFloat(i, 'E', -1, 64)
249+
return strconv.FormatFloat(i, 'f', -1, 64)
250250
case float32:
251-
return strconv.FormatFloat(float64(i), 'E', -1, 32)
251+
return strconv.FormatFloat(float64(i), 'f', -1, 32)
252252
case int64:
253253
return strconv.FormatInt(i, 10)
254254
case int32:

0 commit comments

Comments
 (0)