Skip to content

Commit e958fe7

Browse files
authored
moved add_if and add_regex_if to transform_filter from transform_network (#453)
1 parent a10ad6e commit e958fe7

File tree

8 files changed

+173
-194
lines changed

8 files changed

+173
-194
lines changed

README.md

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -389,10 +389,8 @@ Using `remove_entry_if_not_equal` will remove the entry if the specified field e
389389

390390
1. Resolve subnet from IP addresses
391391
1. Resolve known network service names from port numbers and protocols
392-
1. Perform simple mathematical transformations on field values
393392
1. Compute geo-location from IP addresses
394393
1. Resolve kubernetes information from IP addresses
395-
1. Perform regex operations on field values
396394

397395
Example configuration:
398396

@@ -408,15 +406,6 @@ parameters:
408406
output: srcSubnet
409407
type: add_subnet
410408
parameters: /24
411-
- input: value
412-
output: value_smaller_than10
413-
type: add_if
414-
parameters: <10
415-
- input: value
416-
output: dir
417-
type: add_if
418-
parameters: ==1
419-
assignee: in
420409
- input: dstPort
421410
output: service
422411
type: add_service
@@ -427,36 +416,25 @@ parameters:
427416
- input: srcIP
428417
output: srcK8S
429418
type: add_kubernetes
430-
- input: srcSubnet
431-
output: match-10.0
432-
type: add_regex_if
433-
parameters: 10.0.*
434419
```
435420

436-
The first rule `add_subnet` generates a new field named `srcSubnet` with the
421+
The rule `add_subnet` generates a new field named `srcSubnet` with the
437422
subnet of `srcIP` calculated based on prefix length from the `parameters` field
438423

439-
The second `add_if` generates a new field named `value_smaller_than10` that contains
440-
the contents of the `value` field for entries that satisfy the condition specified
441-
in the `parameters` variable (smaller than 10 in the example above). In addition, the
442-
field `value_smaller_than10_Evaluate` with value `true` is added to all satisfied
443-
entries. if `assignee` field is set, then on satified parmater i.e. if parameter evalutes true then
444-
`output` value will get value of `assignee` key.
445-
446-
The third rule `add_service` generates a new field named `service` with the known network
424+
The rule `add_service` generates a new field named `service` with the known network
447425
service name of `dstPort` port and `protocol` protocol. Unrecognized ports are ignored
448426
> Note: `protocol` can be either network protocol name or number
449427
>
450428
> Note: optionally supports custom network services resolution by defining configuration parameters
451429
> `servicesFile` and `protocolsFile` with paths to custom services/protocols files respectively
452430

453-
The fourth rule `add_location` generates new fields with the geo-location information retrieved
431+
The rule `add_location` generates new fields with the geo-location information retrieved
454432
from DB [ip2location](https://lite.ip2location.com/) based on `dstIP` IP.
455433
All the geo-location fields will be named by appending `output` value
456434
(`dstLocation` in the example above) to their names in the [ip2location](https://lite.ip2location.com/ DB
457435
(e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`)
458436

459-
The fifth rule `add_kubernetes` generates new fields with kubernetes information by
437+
The rule `add_kubernetes` generates new fields with kubernetes information by
460438
matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
461439
All the kubernetes fields will be named by appending `output` value
462440
(`srcK8S` in the example above) to the kubernetes metadata field names
@@ -470,13 +448,7 @@ will be generated, and named by appending `parameters` value to the label keys.
470448
> 2. using `KUBECONFIG` environment variable
471449
> 3. using local `~/.kube/config`
472450

473-
The sixth rule `add_regex_if` generates a new field named `match-10.0` that contains
474-
the contents of the `srcSubnet` field for entries that match regex expression specified
475-
in the `parameters` variable. In addition, the field `match-10.0_Matched` with
476-
value `true` is added to all matched entries
477-
478-
479-
> Note: above example describes all available transform network `Type` options
451+
> Note: above example describes the most common available transform network `Type` options
480452

481453
> Note: above transform is essential for the `aggregation` phase
482454

docs/api.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,19 @@ Following is the supported API format for filter transformations:
131131
filter:
132132
rules: list of filter rules, each includes:
133133
input: entry input field
134+
output: entry output field
134135
type: (enum) one of the following:
135136
remove_field: removes the field from the entry
136137
remove_entry_if_exists: removes the entry if the field exists
137138
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
138139
remove_entry_if_equal: removes the entry if the field value equals specified value
139140
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
140141
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
142+
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
143+
add_regex_if: add output field if input field satisfies regex pattern from parameters field
141144
value: specified value of input field:
145+
parameters: parameters specific to type
146+
assignee: value needs to assign to output field
142147
</pre>
143148
## Transform Network API
144149
Following is the supported API format for network transformations:
@@ -149,8 +154,6 @@ Following is the supported API format for network transformations:
149154
input: entry input field
150155
output: entry output field
151156
type: (enum) one of the following:
152-
add_regex_if: add output field if input field satisfies regex pattern from parameters field
153-
add_if: add output field if input field satisfies criteria from parameters field
154157
add_subnet: add output subnet field from input field and prefix length from parameters field
155158
add_location: add output location fields from input
156159
add_service: add output network service field from input port and parameters protocol field

pkg/api/transform_filter.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,19 @@ type TransformFilterOperationEnum struct {
2828
RemoveEntryIfEqual string `yaml:"remove_entry_if_equal" json:"remove_entry_if_equal" doc:"removes the entry if the field value equals specified value"`
2929
RemoveEntryIfNotEqual string `yaml:"remove_entry_if_not_equal" json:"remove_entry_if_not_equal" doc:"removes the entry if the field value does not equal specified value"`
3030
AddFieldIfDoesntExist string `yaml:"add_field_if_doesnt_exist" json:"add_field_if_doesnt_exist" doc:"adds a field to the entry if the field does not exist"`
31+
AddFieldIf string `yaml:"add_field_if" json:"add_field_if" doc:"add output field set to assignee if input field satisfies criteria from parameters field"`
32+
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
3133
}
3234

3335
func TransformFilterOperationName(operation string) string {
3436
return GetEnumName(TransformFilterOperationEnum{}, operation)
3537
}
3638

3739
type TransformFilterRule struct {
38-
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
39-
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
40-
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
40+
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
41+
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
42+
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
43+
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
44+
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
45+
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
4146
}

pkg/api/transform_network.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
3939
}
4040

4141
const (
42-
OpAddRegexIf = "add_regex_if"
43-
OpAddIf = "add_if"
4442
OpAddSubnet = "add_subnet"
4543
OpAddLocation = "add_location"
4644
OpAddService = "add_service"
@@ -50,8 +48,6 @@ const (
5048
)
5149

5250
type TransformNetworkOperationEnum struct {
53-
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
54-
AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"`
5551
AddSubnet string `yaml:"add_subnet" json:"add_subnet" doc:"add output subnet field from input field and prefix length from parameters field"`
5652
AddLocation string `yaml:"add_location" json:"add_location" doc:"add output location fields from input"`
5753
AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"`

pkg/pipeline/transform/transform_filter.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
package transform
1919

2020
import (
21+
"fmt"
22+
"regexp"
23+
24+
"github.com/Knetic/govaluate"
2125
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2226
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2327
"github.com/sirupsen/logrus"
@@ -62,6 +66,31 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
6266
if _, ok := entry[rule.Input]; !ok {
6367
outputEntry[rule.Input] = rule.Value
6468
}
69+
case api.TransformFilterOperationName("AddRegExIf"):
70+
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
71+
if err != nil {
72+
continue
73+
}
74+
if matched {
75+
outputEntry[rule.Output] = outputEntry[rule.Input]
76+
outputEntry[rule.Output+"_Matched"] = true
77+
}
78+
case api.TransformFilterOperationName("AddFieldIf"):
79+
expressionString := fmt.Sprintf("val %s", rule.Parameters)
80+
expression, err := govaluate.NewEvaluableExpression(expressionString)
81+
if err != nil {
82+
log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
83+
continue
84+
}
85+
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
86+
if evaluateErr == nil && result.(bool) {
87+
if rule.Assignee != "" {
88+
outputEntry[rule.Output] = rule.Assignee
89+
} else {
90+
outputEntry[rule.Output] = outputEntry[rule.Input]
91+
}
92+
outputEntry[rule.Output+"_Evaluate"] = true
93+
}
6594
default:
6695
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
6796
}

pkg/pipeline/transform/transform_filter_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package transform
2020
import (
2121
"testing"
2222

23+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2324
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2425
"github.com/netobserv/flowlogs-pipeline/pkg/test"
2526
"github.com/stretchr/testify/require"
@@ -230,3 +231,128 @@ func InitNewTransformFilter(t *testing.T, configFile string) Transformer {
230231
require.NoError(t, err)
231232
return newTransform
232233
}
234+
235+
func Test_Transform_AddIfScientificNotation(t *testing.T) {
236+
newNetworkFilter := Filter{
237+
Rules: []api.TransformFilterRule{
238+
{
239+
Input: "value",
240+
Output: "bigger_than_10",
241+
Type: "add_field_if",
242+
Parameters: ">10",
243+
},
244+
{
245+
Input: "value",
246+
Output: "smaller_than_10",
247+
Type: "add_field_if",
248+
Parameters: "<10",
249+
},
250+
{
251+
Input: "value",
252+
Output: "dir",
253+
Assignee: "in",
254+
Type: "add_field_if",
255+
Parameters: "==1",
256+
},
257+
{
258+
Input: "value",
259+
Output: "dir",
260+
Assignee: "out",
261+
Type: "add_field_if",
262+
Parameters: "==0",
263+
},
264+
{
265+
Input: "value",
266+
Output: "not_one",
267+
Assignee: "true",
268+
Type: "add_field_if",
269+
Parameters: "!=1",
270+
},
271+
{
272+
Input: "value",
273+
Output: "not_one",
274+
Assignee: "false",
275+
Type: "add_field_if",
276+
Parameters: "==1",
277+
},
278+
},
279+
}
280+
281+
var entry config.GenericMap
282+
entry = config.GenericMap{
283+
"value": 1.2345e67,
284+
}
285+
output, ok := newNetworkFilter.Transform(entry)
286+
require.True(t, ok)
287+
require.Equal(t, true, output["bigger_than_10_Evaluate"])
288+
require.Equal(t, 1.2345e67, output["bigger_than_10"])
289+
require.Equal(t, "true", output["not_one"])
290+
291+
entry = config.GenericMap{
292+
"value": 1.2345e-67,
293+
}
294+
output, ok = newNetworkFilter.Transform(entry)
295+
require.True(t, ok)
296+
require.Equal(t, true, output["smaller_than_10_Evaluate"])
297+
require.Equal(t, 1.2345e-67, output["smaller_than_10"])
298+
require.Equal(t, "true", output["not_one"])
299+
300+
entry = config.GenericMap{
301+
"value": 1,
302+
}
303+
output, ok = newNetworkFilter.Transform(entry)
304+
require.True(t, ok)
305+
require.Equal(t, true, output["dir_Evaluate"])
306+
require.Equal(t, "in", output["dir"])
307+
require.Equal(t, "false", output["not_one"])
308+
309+
entry = config.GenericMap{
310+
"value": 0,
311+
}
312+
output, ok = newNetworkFilter.Transform(entry)
313+
require.True(t, ok)
314+
require.Equal(t, true, output["dir_Evaluate"])
315+
require.Equal(t, "out", output["dir"])
316+
require.Equal(t, "true", output["not_one"])
317+
}
318+
319+
func Test_TransformFilterDependentRulesAddRegExIf(t *testing.T) {
320+
var yamlConfig = []byte(`
321+
log-level: debug
322+
pipeline:
323+
- name: transform1
324+
- name: write1
325+
follows: transform1
326+
parameters:
327+
- name: transform1
328+
transform:
329+
type: filter
330+
filter:
331+
rules:
332+
- input: subnetSrcIP
333+
type: add_field_if_doesnt_exist
334+
value: 10.0.0.0/24
335+
- input: subnetSrcIP
336+
output: match-10.0.*
337+
type: add_regex_if
338+
parameters: 10.0.*
339+
- input: subnetSrcIP
340+
output: match-11.0.*
341+
type: add_regex_if
342+
parameters: 11.0.*
343+
- name: write1
344+
write:
345+
type: stdout
346+
`)
347+
newNetworkFilter := InitNewTransformFilter(t, string(yamlConfig)).(*Filter)
348+
require.NotNil(t, newNetworkFilter)
349+
350+
entry := test.GetIngestMockEntry(false)
351+
output, ok := newNetworkFilter.Transform(entry)
352+
require.True(t, ok)
353+
354+
require.Equal(t, "10.0.0.1", output["srcIP"])
355+
require.Equal(t, "10.0.0.0/24", output["subnetSrcIP"])
356+
require.Equal(t, "10.0.0.0/24", output["match-10.0.*"])
357+
require.NotEqual(t, "10.0.0.0/24", output["match-11.0.*"])
358+
}

pkg/pipeline/transform/transform_network.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ import (
2121
"fmt"
2222
"net"
2323
"os"
24-
"regexp"
2524
"strconv"
2625
"time"
2726

28-
"github.com/Knetic/govaluate"
2927
"github.com/netobserv/flowlogs-pipeline/pkg/api"
3028
"github.com/netobserv/flowlogs-pipeline/pkg/config"
3129
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
@@ -56,31 +54,6 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
5654
// TODO: for efficiency and maintainability, maybe each case in the switch below should be an individual implementation of Transformer
5755
for _, rule := range n.Rules {
5856
switch rule.Type {
59-
case api.OpAddRegexIf:
60-
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
61-
if err != nil {
62-
continue
63-
}
64-
if matched {
65-
outputEntry[rule.Output] = outputEntry[rule.Input]
66-
outputEntry[rule.Output+"_Matched"] = true
67-
}
68-
case api.OpAddIf:
69-
expressionString := fmt.Sprintf("val %s", rule.Parameters)
70-
expression, err := govaluate.NewEvaluableExpression(expressionString)
71-
if err != nil {
72-
log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
73-
continue
74-
}
75-
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
76-
if evaluateErr == nil && result.(bool) {
77-
if rule.Assignee != "" {
78-
outputEntry[rule.Output] = rule.Assignee
79-
} else {
80-
outputEntry[rule.Output] = outputEntry[rule.Input]
81-
}
82-
outputEntry[rule.Output+"_Evaluate"] = true
83-
}
8457
case api.OpAddSubnet:
8558
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters))
8659
if err != nil {

0 commit comments

Comments
 (0)